re_sdk/recording_stream.rs
1use std::fmt;
2use std::io::IsTerminal as _;
3use std::sync::atomic::AtomicI64;
4use std::sync::{Arc, Weak};
5use std::time::Duration;
6
7use ahash::HashMap;
8use crossbeam::channel::{Receiver, RecvTimeoutError, Sender};
9use itertools::Either;
10use nohash_hasher::IntMap;
11use parking_lot::Mutex;
12use re_chunk::{
13 BatcherFlushError, BatcherHooks, Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError,
14 ChunkComponents, ChunkError, ChunkId, PendingRow, RowId, TimeColumn,
15};
16use re_log_types::{
17 ApplicationId, ArrowRecordBatchReleaseCallback, BlueprintActivationCommand, EntityPath, LogMsg,
18 RecordingId, StoreId, StoreInfo, StoreKind, StoreSource, TimeCell, TimeInt, TimePoint,
19 Timeline, TimelineName,
20};
21use re_quota_channel::send_crossbeam;
22use re_sdk_types::archetypes::RecordingInfo;
23use re_sdk_types::components::Timestamp;
24use re_sdk_types::{AsComponents, SerializationError, SerializedComponentColumn};
25
26use crate::binary_stream_sink::BinaryStreamStorage;
27use crate::sink::{LogSink, MemorySinkStorage, SinkFlushError};
28
29// ---
30
31/// Private environment variable meant for tests.
32///
33/// When set, all recording streams will write to disk at the path indicated by the env-var rather
34/// than doing what they were asked to do - `connect_grpc()`, `buffered()`, even `save()` will re-use the same sink.
35const ENV_FORCE_SAVE: &str = "_RERUN_TEST_FORCE_SAVE";
36
37/// Returns path for force sink if private environment variable `_RERUN_TEST_FORCE_SAVE` is set
38///
39/// Newly created [`RecordingStream`]s should use a [`crate::sink::FileSink`] pointing to this path.
40/// Furthermore, [`RecordingStream::set_sink`] calls after this should not swap out to a new sink but re-use the existing one.
41/// Note that creating a new [`crate::sink::FileSink`] to the same file path (even temporarily) can cause
42/// a race between file creation (and thus clearing) and pending file writes.
43pub fn forced_sink_path() -> Option<String> {
44 std::env::var(ENV_FORCE_SAVE).ok()
45}
46
47/// Errors that can occur when creating/manipulating a [`RecordingStream`].
48#[derive(thiserror::Error, Debug)]
49pub enum RecordingStreamError {
50 /// Error within the underlying file sink.
51 #[error("Failed to create the underlying file sink: {0}")]
52 FileSink(#[from] re_log_encoding::FileSinkError),
53
54 /// Error within the underlying chunk batcher.
55 #[error("Failed to convert data to a valid chunk: {0}")]
56 Chunk(#[from] ChunkError),
57
58 /// Error within the underlying chunk batcher.
59 #[error("Failed to spawn the underlying batcher: {0}")]
60 ChunkBatcher(#[from] ChunkBatcherError),
61
62 /// Error within the underlying serializer.
63 #[error("Failed to serialize component data: {0}")]
64 Serialization(#[from] SerializationError),
65
66 /// Error spawning one of the background threads.
67 #[error("Failed to spawn background thread '{name}': {err}")]
68 SpawnThread {
69 /// Name of the thread
70 name: String,
71
72 /// Inner error explaining why the thread failed to spawn.
73 err: std::io::Error,
74 },
75
76 /// Error spawning a Rerun Viewer process.
77 #[error(transparent)] // makes bubbling all the way up to main look nice
78 SpawnViewer(#[from] crate::SpawnError),
79
80 /// Failure to host a web viewer and/or Rerun server.
81 #[cfg(feature = "web_viewer")]
82 #[error(transparent)]
83 WebSink(#[from] crate::web_viewer::WebViewerSinkError),
84
85 /// An error occurred while attempting to use a [`re_importer::Importer`].
86 #[cfg(feature = "importers")]
87 #[error(transparent)]
88 ImporterError(#[from] re_importer::ImporterError),
89
90 /// Invalid gRPC server address.
91 #[error(transparent)]
92 UriError(#[from] re_uri::Error),
93
94 /// Invalid endpoint
95 #[error("not a `/proxy` endpoint")]
96 NotAProxyEndpoint,
97
98 /// Invalid bind IP.
99 #[error(transparent)]
100 InvalidAddress(#[from] std::net::AddrParseError),
101}
102
103/// Results that can occur when creating/manipulating a [`RecordingStream`].
104pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
105
106// ---
107
108/// Construct a [`RecordingStream`].
109///
110/// ``` no_run
111/// # use re_sdk::RecordingStreamBuilder;
112/// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
113/// # Ok::<(), Box<dyn std::error::Error>>(())
114/// ```
115///
116/// Automatically sends a [`Chunk`] with the default [`RecordingInfo`] to
117/// the sink, unless an explicit `recording_id` is set via [`RecordingStreamBuilder::recording_id`].
118#[derive(Debug)]
119pub struct RecordingStreamBuilder {
120 application_id: ApplicationId,
121 store_kind: StoreKind,
122 recording_id: Option<RecordingId>,
123 store_source: Option<StoreSource>,
124
125 default_enabled: bool,
126 enabled: Option<bool>,
127
128 batcher_hooks: BatcherHooks,
129 batcher_config: Option<ChunkBatcherConfig>,
130
131 // Optional user-defined recording properties.
132 should_send_properties: bool,
133 recording_info: RecordingInfo,
134
135 /// Optional blueprint with activation settings.
136 blueprint: Option<crate::blueprint::BlueprintOpts>,
137}
138
139impl RecordingStreamBuilder {
140 /// Create a new [`RecordingStreamBuilder`] with the given [`ApplicationId`].
141 ///
142 /// The [`ApplicationId`] is usually the name of your app.
143 ///
144 /// ```no_run
145 /// # use re_sdk::RecordingStreamBuilder;
146 /// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
147 /// # Ok::<(), Box<dyn std::error::Error>>(())
148 /// ```
149 //
150 // NOTE: track_caller so that we can see if we are being called from an official example.
151 #[track_caller]
152 pub fn new(application_id: impl Into<ApplicationId>) -> Self {
153 let application_id = application_id.into();
154
155 Self {
156 application_id,
157 store_kind: StoreKind::Recording,
158 recording_id: None,
159 store_source: None,
160
161 default_enabled: true,
162 enabled: None,
163
164 batcher_config: None,
165 batcher_hooks: BatcherHooks::NONE,
166
167 should_send_properties: true,
168 recording_info: RecordingInfo::new()
169 .with_start_time(re_sdk_types::components::Timestamp::now()),
170
171 blueprint: None,
172 }
173 }
174
175 /// Create a new [`RecordingStreamBuilder`] with the given [`StoreId`].
176 //
177 // NOTE: track_caller so that we can see if we are being called from an official example.
178 #[track_caller]
179 pub fn from_store_id(store_id: &StoreId) -> Self {
180 Self {
181 application_id: store_id.application_id().clone(),
182 store_kind: store_id.kind(),
183 recording_id: Some(store_id.recording_id().clone()),
184 store_source: None,
185
186 default_enabled: true,
187 enabled: None,
188
189 batcher_config: None,
190 batcher_hooks: BatcherHooks::NONE,
191
192 should_send_properties: true,
193 recording_info: RecordingInfo::new()
194 .with_start_time(re_sdk_types::components::Timestamp::now()),
195
196 blueprint: None,
197 }
198 }
199
200 /// Set whether or not Rerun is enabled by default.
201 ///
202 /// If the `RERUN` environment variable is set, it will override this.
203 ///
204 /// Set also: [`Self::enabled`].
205 #[inline]
206 pub fn default_enabled(mut self, default_enabled: bool) -> Self {
207 self.default_enabled = default_enabled;
208 self
209 }
210
211 /// Set whether or not Rerun is enabled.
212 ///
213 /// Setting this will ignore the `RERUN` environment variable.
214 ///
215 /// Set also: [`Self::default_enabled`].
216 #[inline]
217 pub fn enabled(mut self, enabled: bool) -> Self {
218 self.enabled = Some(enabled);
219 self
220 }
221
222 /// Set the `RecordingId` for this context.
223 ///
224 /// If you're logging from multiple processes and want all the messages to end up in the same
225 /// recording, you must make sure that they all set the same `RecordingId` using this function.
226 ///
227 /// Note that many stores can share the same [`ApplicationId`], but they all have
228 /// unique `RecordingId`s.
229 ///
230 /// The default is to use a random `RecordingId`.
231 ///
232 /// When explicitly setting a `RecordingId`, the initial chunk that contains the recording
233 /// properties will not be sent.
234 #[inline]
235 pub fn recording_id(mut self, recording_id: impl Into<RecordingId>) -> Self {
236 self.recording_id = Some(recording_id.into());
237 self.send_properties(false)
238 }
239
240 /// Sets an optional name for the recording.
241 #[inline]
242 pub fn recording_name(mut self, name: impl Into<String>) -> Self {
243 self.recording_info = self.recording_info.with_name(name.into());
244 self
245 }
246
247 /// Sets an optional name for the recording.
248 #[inline]
249 pub fn recording_started(mut self, started: impl Into<Timestamp>) -> Self {
250 self.recording_info = self.recording_info.with_start_time(started);
251 self
252 }
253
254 /// Whether the [`RecordingInfo`] chunk should be sent.
255 #[inline]
256 pub fn send_properties(mut self, should_send: bool) -> Self {
257 self.should_send_properties = should_send;
258 self
259 }
260
261 /// Set a blueprint and make it active immediately.
262 ///
263 /// Use this when you want your blueprint to be shown immediately.
264 ///
265 /// To send a blueprint to an existing recording, use [`RecordingStream::send_blueprint`] instead.
266 #[inline]
267 pub fn with_blueprint(mut self, blueprint: crate::blueprint::Blueprint) -> Self {
268 self.blueprint = Some(crate::blueprint::BlueprintOpts {
269 blueprint,
270 activation: crate::blueprint::BlueprintActivation {
271 make_active: true,
272 make_default: true,
273 },
274 });
275 self
276 }
277
278 /// Set a default blueprint for this application.
279 ///
280 /// If the application already has an active blueprint, the new blueprint won't become
281 /// active until the user resets the blueprint. If you want to activate the blueprint
282 /// immediately, use [`Self::with_blueprint`] instead.
283 ///
284 /// To send a blueprint to an existing recording, use [`RecordingStream::send_blueprint`] instead.
285 #[inline]
286 pub fn with_default_blueprint(mut self, blueprint: crate::blueprint::Blueprint) -> Self {
287 self.blueprint = Some(crate::blueprint::BlueprintOpts {
288 blueprint,
289 activation: crate::blueprint::BlueprintActivation {
290 make_active: false,
291 make_default: true,
292 },
293 });
294 self
295 }
296
297 /// Specifies the configuration of the internal data batching mechanism.
298 ///
299 /// If not set, the default configuration for the currently active sink will be used.
300 /// Any environment variables as specified on [`ChunkBatcherConfig`] will always override respective settings.
301 ///
302 /// See [`ChunkBatcher`] & [`ChunkBatcherConfig`] for more information.
303 #[inline]
304 pub fn batcher_config(mut self, config: ChunkBatcherConfig) -> Self {
305 self.batcher_config = Some(config);
306 self
307 }
308
309 /// Specifies callbacks for the batcher thread.
310 ///
311 /// See [`ChunkBatcher`] & [`BatcherHooks`] for more information.
312 #[inline]
313 pub fn batcher_hooks(mut self, hooks: BatcherHooks) -> Self {
314 self.batcher_hooks = hooks;
315 self
316 }
317
318 #[doc(hidden)]
319 #[inline]
320 pub fn store_source(mut self, store_source: StoreSource) -> Self {
321 self.store_source = Some(store_source);
322 self
323 }
324
325 #[doc(hidden)]
326 #[inline]
327 pub fn blueprint(mut self) -> Self {
328 self.store_kind = StoreKind::Blueprint;
329 self
330 }
331
332 /// Creates a new [`RecordingStream`] that starts in a buffering state (RAM).
333 ///
334 /// ## Example
335 ///
336 /// ```
337 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").buffered()?;
338 /// # Ok::<(), Box<dyn std::error::Error>>(())
339 /// ```
340 pub fn buffered(self) -> RecordingStreamResult<RecordingStream> {
341 self.create_recording_stream("buffered", || {
342 Ok(Box::new(crate::log_sink::BufferedSink::new()))
343 })
344 }
345
346 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
347 /// [`crate::log_sink::MemorySink`].
348 ///
349 /// ## Example
350 ///
351 /// ```
352 /// # fn log_data(_: &re_sdk::RecordingStream) { }
353 ///
354 /// let (rec, storage) = re_sdk::RecordingStreamBuilder::new("rerun_example_app").memory()?;
355 ///
356 /// log_data(&rec);
357 ///
358 /// let data = storage.take();
359 ///
360 /// # Ok::<(), Box<dyn std::error::Error>>(())
361 /// ```
362 pub fn memory(
363 self,
364 ) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> {
365 let rec = self.create_recording_stream("memory", || {
366 Ok(Box::new(crate::log_sink::BufferedSink::new()))
367 })?;
368
369 let sink = crate::log_sink::MemorySink::new(rec.clone());
370 let storage = sink.buffer();
371 // Using set_sink here is necessary because the MemorySink needs to know
372 // it's own RecordingStream, which means we can't use `new` above.
373 // This has the downside of a bit of creation overhead and an extra StoreInfo
374 // message being sent to the sink.
375 // TODO(jleibs): Figure out a cleaner way to handle this.
376 rec.set_sink(Box::new(sink));
377 Ok((rec, storage))
378 }
379
380 /// Creates a new [`RecordingStream`] pre-configured to stream data to multiple sinks.
381 ///
382 /// Currently only supports [`GrpcSink`][grpc_sink] and [`FileSink`][file_sink].
383 ///
384 /// If the batcher configuration has not been set explicitly or by environment variables,
385 /// this will change the batcher configuration to a conservative (less often flushing) mix of
386 /// default configurations of the underlying sinks.
387 ///
388 /// [grpc_sink]: crate::sink::GrpcSink
389 /// [file_sink]: crate::sink::FileSink
390 pub fn set_sinks(
391 self,
392 sinks: impl crate::sink::IntoMultiSink,
393 ) -> RecordingStreamResult<RecordingStream> {
394 self.create_recording_stream("set_sinks", || Ok(Box::new(sinks.into_multi_sink())))
395 }
396
397 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
398 /// remote Rerun instance.
399 ///
400 /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
401 ///
402 /// ## Example
403 ///
404 /// ```no_run
405 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").connect_grpc()?;
406 /// # Ok::<(), Box<dyn std::error::Error>>(())
407 /// ```
408 pub fn connect_grpc(self) -> RecordingStreamResult<RecordingStream> {
409 self.connect_grpc_opts(format!(
410 "rerun+http://127.0.0.1:{}/proxy",
411 crate::DEFAULT_SERVER_PORT
412 ))
413 }
414
415 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
416 /// remote Rerun instance.
417 ///
418 /// ## Example
419 ///
420 /// ```no_run
421 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
422 /// .connect_grpc_opts("rerun+http://127.0.0.1:9876/proxy")?;
423 /// # Ok::<(), Box<dyn std::error::Error>>(())
424 /// ```
425 pub fn connect_grpc_opts(
426 self,
427 url: impl Into<String>,
428 ) -> RecordingStreamResult<RecordingStream> {
429 self.create_recording_stream("connect_grpc", || {
430 let url: String = url.into();
431 let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
432 return Err(RecordingStreamError::NotAProxyEndpoint);
433 };
434 Ok(Box::new(crate::log_sink::GrpcSink::new(uri)))
435 })
436 }
437
438 #[cfg(feature = "server")]
439 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
440 /// locally hosted gRPC server.
441 ///
442 /// The server is hosted on the default IP and port, and may be connected to by any SDK or Viewer
443 /// at `rerun+http://127.0.0.1:9876/proxy` or by just running `rerun --connect`.
444 ///
445 /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
446 ///
447 /// The gRPC server will buffer in memory so that late connecting viewers will still get all the data.
448 /// You can control the amount of data buffered by the gRPC server using [`Self::serve_grpc_opts`].
449 /// Once the memory limit is reached, the earliest logged data
450 /// will be dropped. Static data is never dropped.
451 ///
452 /// NOTE: When the `RecordingStream` is dropped or disconnected, it will shut down the gRPC server.
453 pub fn serve_grpc(self) -> RecordingStreamResult<RecordingStream> {
454 use re_grpc_server::ServerOptions;
455
456 self.serve_grpc_opts(
457 "0.0.0.0",
458 crate::DEFAULT_SERVER_PORT,
459 ServerOptions {
460 memory_limit: re_memory::MemoryLimit::from_fraction_of_total(0.25),
461 ..Default::default()
462 },
463 )
464 }
465
466 #[cfg(feature = "server")]
467 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
468 /// locally hosted gRPC server.
469 ///
470 /// The server is hosted on the given `bind_ip` and `port`, may be connected to by any SDK or Viewer
471 /// at `rerun+http://{bind_ip}:{port}/proxy`.
472 ///
473 /// `0.0.0.0` is a good default for `bind_ip`.
474 ///
475 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
476 /// You can control the amount of data buffered by the gRPC server with the `server_options` argument.
477 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
478 ///
479 /// NOTE: When the `RecordingStream` is dropped or disconnected, it will shut down the gRPC server.
480 pub fn serve_grpc_opts(
481 self,
482 bind_ip: impl AsRef<str>,
483 port: u16,
484 server_options: re_grpc_server::ServerOptions,
485 ) -> RecordingStreamResult<RecordingStream> {
486 self.create_recording_stream("serve_grpc", || {
487 Ok(Box::new(crate::grpc_server::GrpcServerSink::new(
488 bind_ip.as_ref(),
489 port,
490 server_options,
491 )?))
492 })
493 }
494
495 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an
496 /// RRD file on disk.
497 ///
498 /// The Rerun Viewer is able to read continuously from the resulting rrd file while it is being written.
499 /// However, depending on your OS and configuration, changes may not be immediately visible due to file caching.
500 /// This is a common issue on Windows and (to a lesser extent) on `MacOS`.
501 ///
502 /// ## Example
503 ///
504 /// ```no_run
505 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
506 /// # Ok::<(), Box<dyn std::error::Error>>(())
507 /// ```
508 #[cfg(not(target_arch = "wasm32"))]
509 pub fn save(
510 self,
511 path: impl Into<std::path::PathBuf>,
512 ) -> RecordingStreamResult<RecordingStream> {
513 self.create_recording_stream("save", || Ok(Box::new(crate::sink::FileSink::new(path)?)))
514 }
515
516 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to stdout.
517 ///
518 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
519 /// default back to `buffered` mode, in order not to break the user's terminal.
520 ///
521 /// ## Example
522 ///
523 /// ```no_run
524 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").stdout()?;
525 /// # Ok::<(), Box<dyn std::error::Error>>(())
526 /// ```
527 #[cfg(not(target_arch = "wasm32"))]
528 pub fn stdout(self) -> RecordingStreamResult<RecordingStream> {
529 if std::io::stdout().is_terminal() {
530 re_log::debug!("Ignored call to stdout() because stdout is a terminal");
531 return self.buffered();
532 }
533
534 self.create_recording_stream("stdout", || Ok(Box::new(crate::sink::FileSink::stdout()?)))
535 }
536
537 /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
538 /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
539 ///
540 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
541 /// that viewer instead of starting a new one.
542 ///
543 /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
544 /// as well as the underlying connection.
545 ///
546 /// ## Example
547 ///
548 /// ```no_run
549 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").spawn()?;
550 /// # Ok::<(), Box<dyn std::error::Error>>(())
551 /// ```
552 pub fn spawn(self) -> RecordingStreamResult<RecordingStream> {
553 self.spawn_opts(&Default::default())
554 }
555
556 /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
557 /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
558 ///
559 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
560 /// that viewer instead of starting a new one.
561 ///
562 /// The behavior of the spawned Viewer can be configured via `opts`.
563 /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
564 ///
565 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
566 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
567 /// call to `flush` to block indefinitely if a connection cannot be established.
568 ///
569 /// ## Example
570 ///
571 /// ```no_run
572 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
573 /// .spawn_opts(&re_sdk::SpawnOptions::default())?;
574 /// # Ok::<(), Box<dyn std::error::Error>>(())
575 /// ```
576 pub fn spawn_opts(self, opts: &crate::SpawnOptions) -> RecordingStreamResult<RecordingStream> {
577 if !self.is_enabled() {
578 re_log::debug!("Rerun disabled - call to spawn() ignored");
579 return Ok(RecordingStream::disabled());
580 }
581
582 // NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
583 // what, thus spawning a viewer is pointless (and probably not intended).
584 if forced_sink_path().is_some() {
585 let url = format!("rerun+http://{}/proxy", opts.connect_addr());
586 return self.connect_grpc_opts(url);
587 }
588
589 // Spawn viewer and connect normally.
590 // spawn() returns the actual port used, which may differ from opts.port when --new picks a free port.
591 let actual_port = crate::spawn(opts)?;
592 let addr = std::net::SocketAddr::new(
593 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
594 actual_port,
595 );
596 self.connect_grpc_opts(format!("rerun+http://{addr}/proxy"))
597 }
598
599 /// Returns whether or not logging is enabled, a [`StoreInfo`], the associated batcher
600 /// configuration, and the blueprint.
601 ///
602 /// This can be used to then construct a [`RecordingStream`] manually using
603 /// [`RecordingStream::new`].
604 pub fn into_args(
605 self,
606 ) -> (
607 bool,
608 StoreInfo,
609 Option<RecordingInfo>,
610 Option<ChunkBatcherConfig>,
611 BatcherHooks,
612 Option<crate::blueprint::BlueprintOpts>,
613 ) {
614 let enabled = self.is_enabled();
615
616 let Self {
617 application_id,
618 store_kind,
619 recording_id,
620 store_source,
621 default_enabled: _,
622 enabled: _,
623 batcher_config,
624 batcher_hooks,
625 should_send_properties,
626 recording_info,
627 blueprint,
628 } = self;
629
630 let store_id = StoreId::new(
631 store_kind,
632 application_id,
633 recording_id.unwrap_or_else(RecordingId::random),
634 );
635 let store_source = store_source.unwrap_or_else(|| StoreSource::RustSdk {
636 rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
637 llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
638 });
639
640 let store_info = StoreInfo::new(store_id, store_source);
641
642 (
643 enabled,
644 store_info,
645 should_send_properties.then_some(recording_info),
646 batcher_config,
647 batcher_hooks,
648 blueprint,
649 )
650 }
651
652 fn create_recording_stream(
653 self,
654 function_name: &'static str,
655 sink_factory: impl FnOnce() -> RecordingStreamResult<Box<dyn LogSink>>,
656 ) -> RecordingStreamResult<RecordingStream> {
657 let (enabled, store_info, properties, batcher_config, batcher_hooks, blueprint_opts) =
658 self.into_args();
659 if enabled {
660 let stream = RecordingStream::new(
661 store_info,
662 properties,
663 batcher_config,
664 batcher_hooks,
665 sink_factory()?,
666 )?;
667 if let Some(blueprint_opts) = blueprint_opts {
668 blueprint_opts.send(&stream)?;
669 }
670 Ok(stream)
671 } else {
672 re_log::debug!("Rerun disabled - call to {function_name}() ignored");
673 Ok(RecordingStream::disabled())
674 }
675 }
676
677 /// Internal check for whether or not logging is enabled using explicit/default settings & env var.
678 fn is_enabled(&self) -> bool {
679 self.enabled
680 .unwrap_or_else(|| crate::decide_logging_enabled(self.default_enabled))
681 }
682}
683
684// ----------------------------------------------------------------------------
685
686/// A [`RecordingStream`] handles everything related to logging data into Rerun.
687///
688/// You can construct a new [`RecordingStream`] using [`RecordingStreamBuilder`] or
689/// [`RecordingStream::new`].
690///
691/// ## Sinks
692///
693/// Data is logged into Rerun via [`LogSink`]s.
694///
695/// The underlying [`LogSink`] of a [`RecordingStream`] can be changed at any point during its
696/// lifetime by calling [`RecordingStream::set_sink`] or one of the higher level helpers
697/// ([`RecordingStream::connect_grpc`], [`RecordingStream::memory`],
698/// [`RecordingStream::save`], [`RecordingStream::disconnect`]).
699///
700/// See [`RecordingStream::set_sink`] for more information.
701///
702/// ## Multithreading and ordering
703///
704/// [`RecordingStream`] can be cheaply cloned and used freely across any number of threads.
705///
706/// Internally, all operations are linearized into a pipeline:
707/// - All operations sent by a given thread will take effect in the same exact order as that
708/// thread originally sent them in, from its point of view.
709/// - There isn't any well defined global order across multiple threads.
710///
711/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all previous data sent by the calling thread
712/// has been recorded and (if applicable) flushed to the underlying OS-managed file descriptor,
713/// but other threads may still have data in flight.
714///
715/// ## Shutdown
716///
717/// The [`RecordingStream`] can only be shutdown by dropping all instances of it, at which point
718/// it will automatically take care of flushing any pending data that might remain in the pipeline.
719///
720/// Shutting down cannot ever block.
721#[derive(Clone)]
722pub struct RecordingStream {
723 inner: Either<Arc<RecordingStreamInner>, Weak<RecordingStreamInner>>,
724}
725
726impl RecordingStream {
727 /// Passes a reference to the [`RecordingStreamInner`], if it exists.
728 ///
729 /// This works whether the underlying stream is strong or weak.
730 #[inline]
731 fn with<F: FnOnce(&RecordingStreamInner) -> R, R>(&self, f: F) -> Option<R> {
732 match &self.inner {
733 Either::Left(strong) => Some(f(strong)),
734 Either::Right(weak) => Some(f(&*weak.upgrade()?)),
735 }
736 }
737
738 /// Clones the [`RecordingStream`] without incrementing the refcount.
739 ///
740 /// Useful e.g. if you want to make sure that a detached thread won't prevent the [`RecordingStream`]
741 /// from flushing during shutdown.
742 //
743 // TODO(#5335): shutdown flushing behavior is too brittle.
744 #[inline]
745 pub fn clone_weak(&self) -> Self {
746 Self {
747 inner: match &self.inner {
748 Either::Left(strong) => Either::Right(Arc::downgrade(strong)),
749 Either::Right(weak) => Either::Right(Weak::clone(weak)),
750 },
751 }
752 }
753
754 /// Returns the current reference count of the [`RecordingStream`].
755 ///
756 /// Returns 0 if the stream was created by [`RecordingStream::disabled()`],
757 /// or if it is a [`clone_weak()`][Self::clone_weak] of a stream whose strong instances
758 /// have all been dropped.
759 pub fn ref_count(&self) -> usize {
760 match &self.inner {
761 Either::Left(strong) => Arc::strong_count(strong),
762 Either::Right(weak) => weak.strong_count(),
763 }
764 }
765}
766
767// TODO(#5335): shutdown flushing behavior is too brittle.
768impl Drop for RecordingStream {
769 #[inline]
770 fn drop(&mut self) {
771 // If this holds the last strong handle to the recording, make sure that all pending
772 // importer threads that were started from the SDK actually run to completion (they
773 // all hold a weak handle to this very recording!).
774 //
775 // NOTE: It's very important to do so from the `Drop` implementation of `RecordingStream`
776 // itself, because the importer threads -- by definition -- will have to send data into
777 // this very recording, therefore we must make sure that at least one strong handle still lives
778 // on until they are all finished.
779 if let Either::Left(strong) = &mut self.inner
780 && Arc::strong_count(strong) == 1
781 {
782 // Keep the recording alive until all importers are finished.
783 self.with(|inner| inner.wait_for_importers());
784 }
785 }
786}
787
788struct RecordingStreamInner {
789 store_info: StoreInfo,
790 recording_info: Option<RecordingInfo>,
791 tick: AtomicI64,
792
793 /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
794 /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
795 /// running.
796 cmds_tx: re_quota_channel::Sender<Command>,
797
798 batcher: ChunkBatcher,
799 batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
800
801 /// Mirror of the batcher's currently active configuration.
802 current_batcher_config: Mutex<ChunkBatcherConfig>,
803
804 /// It true, any new sink will update the batcher's configuration (as far as possible).
805 sink_dependent_batcher_config: bool,
806
807 /// Keeps track of the top-level threads that were spawned in order to execute the importer
808 /// machinery in the context of this `RecordingStream`.
809 ///
810 /// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
811 importer_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,
812
813 pid_at_creation: u32,
814}
815
816impl fmt::Debug for RecordingStreamInner {
817 #[inline]
818 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
819 f.debug_struct("RecordingStreamInner")
820 .field("store_id", &self.store_info.store_id)
821 .finish_non_exhaustive()
822 }
823}
824
825impl Drop for RecordingStreamInner {
826 fn drop(&mut self) {
827 if self.is_forked_child() {
828 re_log::error_once!(
829 "Fork detected while dropping RecordingStreamInner. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
830 );
831 return;
832 }
833
834 self.wait_for_importers();
835
836 // NOTE: The command channel is private, if we're here, nothing is currently capable of
837 // sending data down the pipeline.
838 let timeout = Duration::MAX;
839 if let Err(err) = self.batcher.flush_blocking(timeout) {
840 re_log::error!("Failed to flush batcher: {err}");
841 }
842 self.cmds_tx.send(Command::PopPendingChunks).ok();
843 self.cmds_tx.send(Command::Shutdown).ok();
844 if let Some(handle) = self.batcher_to_sink_handle.take() {
845 handle.join().ok();
846 }
847 }
848}
849
850fn resolve_batcher_config(
851 batcher_config: Option<ChunkBatcherConfig>,
852 sink: &dyn LogSink,
853) -> ChunkBatcherConfig {
854 if let Some(explicit_batcher_config) = batcher_config {
855 explicit_batcher_config
856 } else {
857 let default_config = sink.default_batcher_config();
858 default_config.apply_env().unwrap_or_else(|err| {
859 re_log::error!("Failed to parse ChunkBatcherConfig from env: {err}");
860 default_config
861 })
862 }
863}
864
865/// Warns once if `sink` defers finalization to shutdown (e.g. a file sink) and is paired with a
866/// flush-on-every-row batcher config such as [`ChunkBatcherConfig::ALWAYS_TEST_ONLY`].
867///
868/// These sinks have to keep per-chunk metadata in memory until the footer can be written at
869/// process exit. A flush-on-every-row config produces one chunk per row, so for long-running
870/// recordings this can drive memory usage through the roof.
871fn warn_if_problematic_file_sink_config(config: &ChunkBatcherConfig, sink: &dyn LogSink) {
872 if !sink.defers_finalization_to_shutdown() {
873 return;
874 }
875
876 if !config.always_flushes() {
877 return;
878 }
879
880 // Snippet-roundtrip tests intentionally pair this config with a file sink to exercise the
881 // per-row serialization path. The warning is correct in principle but noisy (and fatal under
882 // `RERUN_PANIC_ON_WARN`) for that controlled setup, so suppress it when the test harness
883 // signals strict-test mode.
884 if re_log::env_var_is_truthy("RERUN_STRICT") {
885 return;
886 }
887
888 re_log::warn_once!(
889 "ChunkBatcherConfig::ALWAYS_TEST_ONLY (or an equivalent flush-on-every-row config) is \
890 being used with a file sink. This produces one chunk per row, and the file's footer \
891 cannot be written until the SDK process exits — so per-chunk metadata accumulates in \
892 memory for the entire lifetime of the recording, which can blow up memory usage. \
893 Use the default `ChunkBatcherConfig` for production workloads, or \
894 `ChunkBatcherConfig::LOW_LATENCY` if you need fast flushing."
895 );
896}
897
898impl RecordingStreamInner {
899 fn new(
900 store_info: StoreInfo,
901 recording_info: Option<RecordingInfo>,
902 batcher_config: Option<ChunkBatcherConfig>,
903 batcher_hooks: BatcherHooks,
904 sink: Box<dyn LogSink>,
905 ) -> RecordingStreamResult<Self> {
906 let sink_dependent_batcher_config = batcher_config.is_none();
907 let batcher_config = resolve_batcher_config(batcher_config, &*sink);
908
909 warn_if_problematic_file_sink_config(&batcher_config, &*sink);
910
911 let on_release = batcher_hooks.on_release.clone();
912 let batcher = ChunkBatcher::new(batcher_config, batcher_hooks)?;
913
914 {
915 re_log::debug!(
916 store_id = ?store_info.store_id,
917 "Setting StoreInfo",
918 );
919 sink.send(
920 re_log_types::SetStoreInfo {
921 row_id: *RowId::new(),
922 info: store_info.clone(),
923 }
924 .into(),
925 );
926 }
927
928 let (cmds_tx, cmds_rx) = re_quota_channel::channel(
929 "RecordingStream::cmds",
930 batcher_config.max_bytes_in_flight / 2,
931 );
932
933 let batcher_to_sink_handle = {
934 const NAME: &str = "RecordingStream::batcher_to_sink";
935 std::thread::Builder::new()
936 .name(NAME.into())
937 .spawn({
938 let info = store_info.clone();
939 let batcher = batcher.clone();
940 move || forwarding_thread(info, sink, cmds_rx, batcher.chunks(), on_release)
941 })
942 .map_err(|err| RecordingStreamError::SpawnThread {
943 name: NAME.into(),
944 err,
945 })?
946 };
947
948 if let Some(recording_info) = recording_info.as_ref() {
949 // We pre-populate the batcher with a chunk the contains the `RecordingInfo`
950 // so that these get automatically sent to the sink.
951
952 re_log::trace!(recording_info = ?recording_info, "Adding RecordingInfo to batcher");
953
954 let chunk = Chunk::builder(EntityPath::properties())
955 .with_archetype(RowId::new(), TimePoint::default(), recording_info)
956 .build()?;
957
958 batcher.push_chunk(chunk);
959 }
960
961 Ok(Self {
962 store_info,
963 recording_info,
964 tick: AtomicI64::new(0),
965 cmds_tx,
966 batcher,
967 batcher_to_sink_handle: Some(batcher_to_sink_handle),
968 current_batcher_config: Mutex::new(batcher_config),
969 sink_dependent_batcher_config,
970 importer_handles: Mutex::new(Vec::new()),
971 pid_at_creation: std::process::id(),
972 })
973 }
974
975 #[inline]
976 pub fn is_forked_child(&self) -> bool {
977 self.pid_at_creation != std::process::id()
978 }
979
980 /// Make sure all pending top-level importer threads that were started from the SDK run to completion.
981 //
982 // TODO(cmc): At some point we might want to make it configurable, though I cannot really
983 // think of a use case where you'd want to drop those threads immediately upon
984 // disconnection.
985 fn wait_for_importers(&self) {
986 let importer_handles = std::mem::take(&mut *self.importer_handles.lock());
987 for handle in importer_handles {
988 handle.join().ok();
989 }
990 }
991}
992
993type InspectSinkFn = Box<dyn FnOnce(&dyn LogSink) + Send + 'static>;
994
995type FlushResult = Result<(), SinkFlushError>;
996
997enum Command {
998 RecordMsg(LogMsg),
999 SwapSink {
1000 new_sink: Box<dyn LogSink>,
1001 timeout: Duration,
1002 },
1003 // TODO(#10444): This should go away with more explicit sinks.
1004 InspectSink(InspectSinkFn),
1005 Flush {
1006 on_done: Sender<FlushResult>,
1007 timeout: Duration,
1008 },
1009
1010 /// Drop any sinks whose on-disk format only finalizes at shutdown (i.e. file-like sinks with
1011 /// footers), while leaving streaming sinks (e.g. gRPC) untouched. Used by Python's
1012 /// `RecordingStream.__exit__` to ensure file-backed recordings are consumable as soon as
1013 /// the `with`-block exits, without waiting for GC.
1014 FinalizeDeferredSinks {
1015 on_done: Sender<()>,
1016 timeout: Duration,
1017 },
1018 PopPendingChunks,
1019 Shutdown,
1020}
1021
1022impl std::fmt::Debug for Command {
1023 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1024 match self {
1025 Self::RecordMsg(msg) => f.debug_tuple("RecordMsg").field(msg).finish(),
1026 Self::SwapSink { .. } => f.debug_struct("SwapSink").finish_non_exhaustive(),
1027 Self::InspectSink(_) => f.debug_tuple("InspectSink").finish_non_exhaustive(),
1028 Self::Flush { .. } => f.debug_struct("Flush").finish_non_exhaustive(),
1029 Self::FinalizeDeferredSinks { .. } => f
1030 .debug_struct("FinalizeDeferredSinks")
1031 .finish_non_exhaustive(),
1032 Self::PopPendingChunks => write!(f, "PopPendingChunks"),
1033 Self::Shutdown => write!(f, "Shutdown"),
1034 }
1035 }
1036}
1037
1038impl re_byte_size::SizeBytes for Command {
1039 fn heap_size_bytes(&self) -> u64 {
1040 match self {
1041 Self::RecordMsg(msg) => msg.heap_size_bytes(),
1042 Self::SwapSink { .. }
1043 | Self::InspectSink(_)
1044 | Self::Flush { .. }
1045 | Self::FinalizeDeferredSinks { .. }
1046 | Self::PopPendingChunks
1047 | Self::Shutdown => 0,
1048 }
1049 }
1050}
1051
1052impl Command {
1053 fn flush(timeout: Duration) -> (Self, Receiver<FlushResult>) {
1054 let (on_done, rx) = crossbeam::channel::bounded(1); // oneshot
1055 (Self::Flush { on_done, timeout }, rx)
1056 }
1057
1058 fn finalize_deferred_sinks(timeout: Duration) -> (Self, Receiver<()>) {
1059 let (on_done, rx) = crossbeam::channel::bounded(1); // oneshot
1060 (Self::FinalizeDeferredSinks { on_done, timeout }, rx)
1061 }
1062}
1063
1064impl RecordingStream {
1065 /// Creates a new [`RecordingStream`] with a given [`StoreInfo`] and [`LogSink`].
1066 ///
1067 /// You can create a [`StoreInfo`] with [`crate::new_store_info`];
1068 ///
1069 /// The [`StoreInfo`] is immediately sent to the sink in the form of a
1070 /// [`re_log_types::SetStoreInfo`].
1071 ///
1072 /// You can find sinks in [`crate::sink`].
1073 ///
1074 /// If no batcher configuration is provided, the default batcher configuration for the sink will be used.
1075 /// Any environment variables as specified in [`ChunkBatcherConfig`] will always override respective settings.
1076 ///
1077 /// See also: [`RecordingStreamBuilder`].
1078 #[must_use = "Recording will get closed automatically once all instances of this object have been dropped"]
1079 pub fn new(
1080 store_info: StoreInfo,
1081 recording_info: Option<RecordingInfo>,
1082 batcher_config: Option<ChunkBatcherConfig>,
1083 batcher_hooks: BatcherHooks,
1084 sink: Box<dyn LogSink>,
1085 ) -> RecordingStreamResult<Self> {
1086 let sink = store_info
1087 .store_id
1088 .is_recording()
1089 .then(forced_sink_path)
1090 .flatten()
1091 .map_or(sink, |path| {
1092 re_log::info!("Forcing FileSink because of env-var {ENV_FORCE_SAVE}={path:?}");
1093 Box::new(
1094 crate::sink::FileSink::new(path)
1095 .expect("Failed to create FileSink for forced test path"),
1096 ) as Box<dyn LogSink>
1097 });
1098
1099 let stream = RecordingStreamInner::new(
1100 store_info,
1101 recording_info,
1102 batcher_config,
1103 batcher_hooks,
1104 sink,
1105 )
1106 .map(|inner| Self {
1107 inner: Either::Left(Arc::new(inner)),
1108 })?;
1109
1110 Ok(stream)
1111 }
1112
1113 /// Creates a new no-op [`RecordingStream`] that drops all logging messages, doesn't allocate
1114 /// any memory and doesn't spawn any threads.
1115 ///
1116 /// [`Self::is_enabled`] will return `false`.
1117 pub const fn disabled() -> Self {
1118 Self {
1119 inner: Either::Right(Weak::new()),
1120 }
1121 }
1122}
1123
1124impl RecordingStream {
1125 /// Log data to Rerun.
1126 ///
1127 /// This is the main entry point for logging data to rerun. It can be used to log anything
1128 /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1129 /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1130 ///
1131 /// The data will be timestamped automatically based on the [`RecordingStream`]'s internal clock.
1132 /// See [`RecordingStream::set_time_sequence`] etc for more information.
1133 ///
1134 /// The entity path can either be a string
1135 /// (with special characters escaped, split on unescaped slashes)
1136 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1137 /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/entity-path> for more on entity paths.
1138 ///
1139 /// See also: [`Self::log_static`] for logging static data.
1140 ///
1141 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1142 /// transport.
1143 /// See [SDK Micro Batching] for more information.
1144 ///
1145 /// # Example:
1146 /// ```ignore
1147 /// # use rerun;
1148 /// # let (rec, storage) = rerun::RecordingStreamBuilder::new("rerun_example_points3d_simple").memory()?;
1149 /// rec.log(
1150 /// "my/points",
1151 /// &rerun::Points3D::new([(0.0, 0.0, 0.0), (1.0, 1.0, 1.0)]),
1152 /// )?;
1153 /// # Ok::<(), Box<dyn std::error::Error>>(())
1154 /// ```
1155 ///
1156 /// # Thread Safety
1157 ///
1158 /// While [`RecordingStream`] is `Send + Sync` and safe to use from multiple threads,
1159 /// **avoid calling `log` while holding a [`std::sync::Mutex`]**. The rerun SDK uses
1160 /// [rayon](https://docs.rs/rayon) internally for parallel processing, and rayon's
1161 /// work-stealing behavior can cause deadlocks when combined with held mutexes
1162 /// (see [rayon#592](https://github.com/rayon-rs/rayon/issues/592)).
1163 ///
1164 /// ```ignore
1165 /// // ❌ Don't do this - potential deadlock:
1166 /// let guard = mutex.lock().unwrap();
1167 /// stream.log("data", &rerun::Points3D::new(points))?;
1168 /// drop(guard);
1169 ///
1170 /// // ✅ Do this instead - extract data first:
1171 /// let points = {
1172 /// let guard = mutex.lock().unwrap();
1173 /// guard.points.clone()
1174 /// };
1175 /// stream.log("data", &rerun::Points3D::new(points))?;
1176 /// ```
1177 ///
1178 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1179 /// [component bundle]: [`AsComponents`]
1180 #[inline]
1181 pub fn log<AS: ?Sized + AsComponents>(
1182 &self,
1183 ent_path: impl Into<EntityPath>,
1184 as_components: &AS,
1185 ) -> RecordingStreamResult<()> {
1186 self.log_with_static(ent_path, false, as_components)
1187 }
1188
1189 /// Lower-level logging API to provide data spanning multiple timepoints.
1190 ///
1191 /// Unlike the regular `log` API, which is row-oriented, this API lets you submit the data
1192 /// in a columnar form. The lengths of all of the [`TimeColumn`] and the component columns
1193 /// must match. All data that occurs at the same index across the different index/time and components
1194 /// arrays will act as a single logical row.
1195 ///
1196 /// Note that this API ignores any stateful index/time set on the log stream via the
1197 /// [`Self::set_time`]/[`Self::set_timepoint`]/etc. APIs.
1198 /// Furthermore, this will _not_ inject the default timelines `log_tick` and `log_time` timeline columns.
1199 pub fn send_columns(
1200 &self,
1201 ent_path: impl Into<EntityPath>,
1202 indexes: impl IntoIterator<Item = TimeColumn>,
1203 columns: impl IntoIterator<Item = SerializedComponentColumn>,
1204 ) -> RecordingStreamResult<()> {
1205 let id = ChunkId::new();
1206
1207 let indexes = indexes
1208 .into_iter()
1209 .map(|col| (*col.timeline().name(), col))
1210 .collect();
1211
1212 let components: ChunkComponents = columns
1213 .into_iter()
1214 .map(|column| (column.descriptor, column.list_array))
1215 .collect();
1216
1217 let chunk = Chunk::from_auto_row_ids(id, ent_path.into(), indexes, components)?;
1218
1219 self.send_chunk(chunk);
1220
1221 Ok(())
1222 }
1223
1224 /// Log data to Rerun.
1225 ///
1226 /// It can be used to log anything
1227 /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1228 /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1229 ///
1230 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1231 /// any temporal data of the same type.
1232 /// All timestamp data associated with this message will be dropped right before sending it to Rerun.
1233 ///
1234 /// This is most often used for [`rerun::ViewCoordinates`](https://docs.rs/rerun/latest/rerun/archetypes/struct.ViewCoordinates.html) and
1235 /// [`rerun::AnnotationContext`](https://docs.rs/rerun/latest/rerun/archetypes/struct.AnnotationContext.html).
1236 ///
1237 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1238 /// transport.
1239 /// See [SDK Micro Batching] for more information.
1240 ///
1241 /// See also [`Self::log`].
1242 ///
1243 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1244 /// [component bundle]: [`AsComponents`]
1245 #[inline]
1246 pub fn log_static<AS: ?Sized + AsComponents>(
1247 &self,
1248 ent_path: impl Into<EntityPath>,
1249 as_components: &AS,
1250 ) -> RecordingStreamResult<()> {
1251 self.log_with_static(ent_path, true, as_components)
1252 }
1253
1254 /// Logs the contents of a [component bundle] into Rerun.
1255 ///
1256 /// If `static_` is set to `true`, all timestamp data associated with this message will be
1257 /// dropped right before sending it to Rerun.
1258 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1259 /// any temporal data of the same type.
1260 ///
1261 /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1262 /// internal clock.
1263 /// See `RecordingStream::set_time_*` family of methods for more information.
1264 ///
1265 /// The entity path can either be a string
1266 /// (with special characters escaped, split on unescaped slashes)
1267 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1268 /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/entity-path> for more on entity paths.
1269 ///
1270 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1271 /// transport.
1272 /// See [SDK Micro Batching] for more information.
1273 ///
1274 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1275 /// [component bundle]: [`AsComponents`]
1276 #[inline]
1277 pub fn log_with_static<AS: ?Sized + AsComponents>(
1278 &self,
1279 ent_path: impl Into<EntityPath>,
1280 static_: bool,
1281 as_components: &AS,
1282 ) -> RecordingStreamResult<()> {
1283 let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1284 self.log_serialized_batches_impl(
1285 row_id,
1286 ent_path,
1287 static_,
1288 as_components.as_serialized_batches(),
1289 )
1290 }
1291
1292 /// Logs a set of [`SerializedComponentBatch`]es into Rerun.
1293 ///
1294 /// If `static_` is set to `true`, all timestamp data associated with this message will be
1295 /// dropped right before sending it to Rerun.
1296 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1297 /// any temporal data of the same type.
1298 ///
1299 /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1300 /// internal clock.
1301 /// See `RecordingStream::set_time_*` family of methods for more information.
1302 ///
1303 /// The number of instances will be determined by the longest batch in the bundle.
1304 ///
1305 /// The entity path can either be a string
1306 /// (with special characters escaped, split on unescaped slashes)
1307 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1308 /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/entity-path> for more on entity paths.
1309 ///
1310 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1311 /// transport.
1312 /// See [SDK Micro Batching] for more information.
1313 ///
1314 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1315 ///
1316 /// [`SerializedComponentBatch`]: [re_types_core::SerializedComponentBatch]
1317 pub fn log_serialized_batches(
1318 &self,
1319 ent_path: impl Into<EntityPath>,
1320 static_: bool,
1321 comp_batches: impl IntoIterator<Item = re_sdk_types::SerializedComponentBatch>,
1322 ) -> RecordingStreamResult<()> {
1323 let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1324 self.log_serialized_batches_impl(row_id, ent_path, static_, comp_batches)
1325 }
1326
1327 /// Sends a property to the recording.
1328 #[inline]
1329 pub fn send_property<AS: ?Sized + AsComponents>(
1330 &self,
1331 name: impl Into<String>,
1332 values: &AS,
1333 ) -> RecordingStreamResult<()> {
1334 let sub_path = EntityPath::from(name.into());
1335 self.log_static(EntityPath::properties().join(&sub_path), values)
1336 }
1337
1338 /// Sends the name of the recording.
1339 #[inline]
1340 pub fn send_recording_name(&self, name: impl Into<String>) -> RecordingStreamResult<()> {
1341 let update = RecordingInfo::update_fields().with_name(name.into());
1342 self.log_static(EntityPath::properties(), &update)
1343 }
1344
1345 /// Sends the start time of the recording.
1346 #[inline]
1347 pub fn send_recording_start_time(
1348 &self,
1349 timestamp: impl Into<Timestamp>,
1350 ) -> RecordingStreamResult<()> {
1351 let update = RecordingInfo::update_fields().with_start_time(timestamp.into());
1352 self.log_static(EntityPath::properties(), &update)
1353 }
1354
1355 // NOTE: For bw and fw compatibility reasons, we need our logging APIs to be fallible, even
1356 // though they really aren't at the moment.
1357 #[expect(clippy::unnecessary_wraps)]
1358 fn log_serialized_batches_impl(
1359 &self,
1360 row_id: RowId,
1361 entity_path: impl Into<EntityPath>,
1362 static_: bool,
1363 comp_batches: impl IntoIterator<Item = re_sdk_types::SerializedComponentBatch>,
1364 ) -> RecordingStreamResult<()> {
1365 if !self.is_enabled() {
1366 return Ok(()); // silently drop the message
1367 }
1368
1369 let entity_path = entity_path.into();
1370
1371 let components: IntMap<_, _> = comp_batches
1372 .into_iter()
1373 .map(|comp_batch| (comp_batch.descriptor.component, comp_batch))
1374 .collect();
1375
1376 // NOTE: The timepoint is irrelevant, the `RecordingStream` will overwrite it using its
1377 // internal clock.
1378 let timepoint = TimePoint::default();
1379
1380 if !components.is_empty() {
1381 let row = PendingRow {
1382 row_id,
1383 timepoint,
1384 components,
1385 };
1386 self.record_row(entity_path, row, !static_);
1387 }
1388
1389 Ok(())
1390 }
1391
1392 /// Logs the file at the given `path` using all [`re_importer::Importer`]s available.
1393 ///
1394 /// A single `path` might be handled by more than one importer.
1395 ///
1396 /// This method blocks until either at least one [`re_importer::Importer`] starts
1397 /// streaming data in or all of them fail.
1398 ///
1399 /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/importers/overview> for more information.
1400 #[cfg(feature = "importers")]
1401 pub fn log_file_from_path(
1402 &self,
1403 filepath: impl AsRef<std::path::Path>,
1404 entity_path_prefix: Option<EntityPath>,
1405 static_: bool,
1406 ) -> RecordingStreamResult<()> {
1407 self.log_file(filepath, None, entity_path_prefix, static_, true)
1408 }
1409
1410 /// Logs the given `contents` using all [`re_importer::Importer`]s available.
1411 ///
1412 /// A single `path` might be handled by more than one importer.
1413 ///
1414 /// This method blocks until either at least one [`re_importer::Importer`] starts
1415 /// streaming data in or all of them fail.
1416 ///
1417 /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/importers/overview> for more information.
1418 #[cfg(feature = "importers")]
1419 pub fn log_file_from_contents(
1420 &self,
1421 filepath: impl AsRef<std::path::Path>,
1422 contents: std::borrow::Cow<'_, [u8]>,
1423 entity_path_prefix: Option<EntityPath>,
1424 static_: bool,
1425 ) -> RecordingStreamResult<()> {
1426 self.log_file(filepath, Some(contents), entity_path_prefix, static_, true)
1427 }
1428
1429 /// If `prefer_current_recording` is set (which is always the case for now), the importer settings
1430 /// will be configured as if the current SDK recording is the currently opened recording.
1431 /// Most importers prefer logging to the currently opened recording if one is set.
1432 #[cfg(feature = "importers")]
1433 #[expect(clippy::fn_params_excessive_bools)] // private function 🤷♂️
1434 fn log_file(
1435 &self,
1436 filepath: impl AsRef<std::path::Path>,
1437 contents: Option<std::borrow::Cow<'_, [u8]>>,
1438 entity_path_prefix: Option<EntityPath>,
1439 static_: bool,
1440 prefer_current_recording: bool,
1441 ) -> RecordingStreamResult<()> {
1442 let Some(store_info) = self.store_info().clone() else {
1443 re_log::warn!(
1444 "Ignored call to log_file() because RecordingStream has not been properly initialized"
1445 );
1446 return Ok(());
1447 };
1448
1449 let filepath = filepath.as_ref();
1450 let has_contents = contents.is_some();
1451
1452 let (tx, rx) = re_log_channel::log_channel(re_log_channel::LogSource::File {
1453 path: filepath.into(),
1454 follow: false,
1455 });
1456
1457 let mut settings = crate::ImporterSettings {
1458 application_id: Some(store_info.application_id().clone()),
1459 recording_id: store_info.recording_id().clone(),
1460 opened_store_id: None,
1461 force_store_info: false,
1462 entity_path_prefix,
1463 follow: false,
1464 timepoint: (!static_).then(|| {
1465 self.with(|inner| {
1466 // Get the current time on all timelines, for the current recording, on the current
1467 // thread…
1468 let mut now = self.now();
1469
1470 // …and then also inject the current recording tick into it.
1471 let tick = inner
1472 .tick
1473 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1474 now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1475
1476 now
1477 })
1478 .unwrap_or_default()
1479 }),
1480 timestamp_offset_ns: None,
1481 timeline_type: re_log_types::TimeType::TimestampNs,
1482 };
1483
1484 if prefer_current_recording {
1485 settings.opened_store_id = Some(store_info.store_id);
1486 }
1487
1488 if let Some(contents) = contents {
1489 re_importer::import_from_file_contents(
1490 &settings,
1491 re_log_types::FileSource::Sdk,
1492 filepath,
1493 contents,
1494 &tx,
1495 )?;
1496 } else {
1497 re_importer::import_from_path(&settings, re_log_types::FileSource::Sdk, filepath, &tx)?;
1498 }
1499 drop(tx);
1500
1501 // We can safely ignore the error on `recv()` as we're in complete control of both ends of
1502 // the channel.
1503 let thread_name = if has_contents {
1504 format!("log_file_from_contents({filepath:?})")
1505 } else {
1506 format!("log_file_from_path({filepath:?})")
1507 };
1508 let handle = std::thread::Builder::new()
1509 .name(thread_name.clone())
1510 .spawn({
1511 let this = self.clone_weak();
1512 move || {
1513 while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) {
1514 match msg {
1515 re_log_channel::DataSourceMessage::LogMsg(log_msg) => {
1516 this.record_msg(log_msg);
1517 }
1518 unsupported => {
1519 re_log::error_once!(
1520 "Ignoring unexpected {} in file",
1521 unsupported.variant_name()
1522 );
1523 }
1524 }
1525 }
1526 }
1527 })
1528 .map_err(|err| RecordingStreamError::SpawnThread {
1529 name: thread_name,
1530 err,
1531 })?;
1532
1533 self.with(|inner| inner.importer_handles.lock().push(handle));
1534
1535 Ok(())
1536 }
1537}
1538
1539#[expect(clippy::needless_pass_by_value)]
1540fn forwarding_thread(
1541 store_info: StoreInfo,
1542 mut sink: Box<dyn LogSink>,
1543 cmds_rx: re_quota_channel::Receiver<Command>,
1544 chunks: re_quota_channel::Receiver<Chunk>,
1545 on_release: Option<ArrowRecordBatchReleaseCallback>,
1546) {
1547 /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate
1548 /// shutdown.
1549 fn handle_cmd(store_info: &StoreInfo, cmd: Command, sink: &mut Box<dyn LogSink>) -> bool {
1550 match cmd {
1551 Command::RecordMsg(msg) => {
1552 sink.send(msg);
1553 }
1554 Command::SwapSink { new_sink, timeout } => {
1555 re_log::trace!("Swapping sink…");
1556
1557 let backlog = {
1558 // Capture the backlog if it exists.
1559 let backlog = sink.drain_backlog();
1560
1561 // Flush the underlying sink if possible.
1562 if let Err(err) = sink.flush_blocking(timeout) {
1563 re_log::error!("Failed to flush previous sink: {err}");
1564 }
1565
1566 backlog
1567 };
1568
1569 // Send the recording info to the new sink. This is idempotent.
1570 {
1571 re_log::debug!(
1572 store_id = ?store_info.store_id,
1573 "Setting StoreInfo",
1574 );
1575 new_sink.send(
1576 re_log_types::SetStoreInfo {
1577 row_id: *RowId::new(),
1578 info: store_info.clone(),
1579 }
1580 .into(),
1581 );
1582 new_sink.send_all(backlog);
1583 }
1584
1585 *sink = new_sink;
1586 }
1587 Command::InspectSink(f) => {
1588 f(sink.as_ref());
1589 }
1590 Command::Flush { on_done, timeout } => {
1591 re_log::trace!("Flushing…");
1592
1593 let result = sink.flush_blocking(timeout);
1594
1595 // Send back the result:
1596 if let Err(crossbeam::channel::SendError(result)) = send_crossbeam(&on_done, result)
1597 && let Err(err) = result
1598 {
1599 // There was an error, and nobody received it:
1600 re_log::error!("Failed to flush sink: {err}");
1601 }
1602 }
1603 Command::FinalizeDeferredSinks { on_done, timeout } => {
1604 if sink.defers_finalization_to_shutdown() && !sink.finalize_deferred_in_place() {
1605 // Composite sinks handle finalization themselves; otherwise the entire
1606 // sink defers finalization (e.g. a bare FileSink), so we have to swap it
1607 // out for a BufferedSink — dropping the old sink runs its writer thread
1608 // to completion and emits the footer.
1609 let backlog = sink.drain_backlog();
1610 if let Err(err) = sink.flush_blocking(timeout) {
1611 re_log::error!("Failed to flush previous sink during finalize: {err}");
1612 }
1613
1614 let new_sink: Box<dyn LogSink> = Box::new(crate::log_sink::BufferedSink::new());
1615 new_sink.send(
1616 re_log_types::SetStoreInfo {
1617 row_id: *RowId::new(),
1618 info: store_info.clone(),
1619 }
1620 .into(),
1621 );
1622 new_sink.send_all(backlog);
1623 *sink = new_sink;
1624 }
1625 send_crossbeam(&on_done, ()).ok();
1626 }
1627 Command::PopPendingChunks => {
1628 // Wake up and skip the current iteration so that we can drain all pending chunks
1629 // before handling the next command.
1630 }
1631 Command::Shutdown => return false,
1632 }
1633
1634 true
1635 }
1636
1637 loop {
1638 // NOTE: Always pop chunks first, this is what makes `Command::PopPendingChunks` possible,
1639 // which in turns makes `RecordingStream::flush_blocking` well defined.
1640 while let Ok(chunk) = chunks.try_recv() {
1641 let mut msg = match chunk.to_arrow_msg() {
1642 Ok(chunk) => chunk,
1643 Err(err) => {
1644 re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1645 continue;
1646 }
1647 };
1648 msg.on_release = on_release.clone();
1649 sink.send(LogMsg::ArrowMsg(store_info.store_id.clone(), msg));
1650 }
1651
1652 re_quota_channel::select! {
1653 recv(chunks) -> res => {
1654 let Ok(chunk) = res else {
1655 // The batcher is gone, which can only happen if the `RecordingStream` itself
1656 // has been dropped.
1657 re_log::trace!("Shutting down forwarding_thread: batcher is gone");
1658 break;
1659 };
1660
1661 let msg = match chunk.to_arrow_msg() {
1662 Ok(chunk) => chunk,
1663 Err(err) => {
1664 re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1665 continue;
1666 }
1667 };
1668
1669 sink.send(LogMsg::ArrowMsg(store_info.store_id.clone(), msg));
1670 }
1671
1672 recv(cmds_rx) -> res => {
1673 let Ok(cmd) = res else {
1674 // All command senders are gone, which can only happen if the
1675 // `RecordingStream` itself has been dropped.
1676 re_log::trace!("Shutting down forwarding_thread: all command senders are gone");
1677 break;
1678 };
1679 if !handle_cmd(&store_info, cmd, &mut sink) {
1680 break; // shutdown
1681 }
1682 }
1683 }
1684
1685 // NOTE: The receiving end of the command stream is owned solely by this thread.
1686 // Past this point, all command writes will return `ErrDisconnected`.
1687 }
1688}
1689
1690impl RecordingStream {
1691 /// Check if logging is enabled on this `RecordingStream`.
1692 ///
1693 /// If not, all recording calls will be ignored.
1694 #[inline]
1695 pub fn is_enabled(&self) -> bool {
1696 self.with(|_| true).unwrap_or(false)
1697 }
1698
1699 /// The [`StoreInfo`] associated with this `RecordingStream`.
1700 #[inline]
1701 pub fn store_info(&self) -> Option<StoreInfo> {
1702 self.with(|inner| inner.store_info.clone())
1703 }
1704
1705 /// Determine whether a fork has happened since creating this `RecordingStream`. In general, this means our
1706 /// batcher/sink threads are gone and all data logged since the fork has been dropped.
1707 ///
1708 /// It is essential that [`crate::cleanup_if_forked_child`] be called after forking the process. SDK-implementations
1709 /// should do this during their initialization phase.
1710 #[inline]
1711 pub fn is_forked_child(&self) -> bool {
1712 self.with(|inner| inner.is_forked_child()).unwrap_or(false)
1713 }
1714}
1715
1716impl RecordingStream {
1717 /// Records an arbitrary [`LogMsg`].
1718 #[inline]
1719 pub fn record_msg(&self, msg: LogMsg) {
1720 let f = move |inner: &RecordingStreamInner| {
1721 // NOTE: Internal channels can never be closed outside of the `Drop` impl, this send cannot
1722 // fail.
1723 inner.cmds_tx.send(Command::RecordMsg(msg)).ok();
1724 inner
1725 .tick
1726 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1727 };
1728
1729 if self.with(f).is_none() {
1730 re_log::warn_once!("Recording disabled - call to record_msg() ignored");
1731 }
1732 }
1733
1734 /// Records a single [`PendingRow`].
1735 ///
1736 /// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
1737 /// [`RecordingStream`]'s internal clock.
1738 ///
1739 /// Internally, incoming [`PendingRow`]s are automatically coalesced into larger [`Chunk`]s to
1740 /// optimize for transport.
1741 #[inline]
1742 pub fn record_row(&self, entity_path: EntityPath, mut row: PendingRow, inject_time: bool) {
1743 let f = move |inner: &RecordingStreamInner| {
1744 // NOTE: We're incrementing the current tick still.
1745 let tick = inner
1746 .tick
1747 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1748 if inject_time {
1749 // Get the current time on all timelines, for the current recording, on the current
1750 // thread…
1751 let mut now = self.now();
1752 // …and then also inject the current recording tick into it.
1753 now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1754
1755 // Inject all these times into the row, overriding conflicting times, if any.
1756 for (timeline, cell) in now {
1757 row.timepoint.insert_cell(timeline, cell);
1758 }
1759 }
1760
1761 inner.batcher.push_row(entity_path, row);
1762 };
1763
1764 if self.with(f).is_none() {
1765 re_log::warn_once!("Recording disabled - call to record_row() ignored");
1766 }
1767 }
1768
1769 /// Logs a single [`Chunk`].
1770 ///
1771 /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1772 /// If you don't want to inject these, use [`Self::send_chunk`] instead.
1773 #[inline]
1774 pub fn log_chunk(&self, mut chunk: Chunk) {
1775 let f = move |inner: &RecordingStreamInner| {
1776 // TODO(cmc): Repeating these values is pretty wasteful. Would be nice to have a way of
1777 // indicating these are fixed across the whole chunk.
1778 // Inject the log time
1779 {
1780 let time_timeline = Timeline::log_time();
1781 let time =
1782 TimeInt::new_temporal(re_log_types::Timestamp::now().nanos_since_epoch());
1783
1784 let repeated_time = std::iter::repeat_n(time.as_i64(), chunk.num_rows()).collect();
1785
1786 let time_column = TimeColumn::new(Some(true), time_timeline, repeated_time);
1787
1788 if let Err(err) = chunk.add_timeline(time_column) {
1789 re_log::error!(
1790 "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1791 time_timeline.name(),
1792 err
1793 );
1794 return;
1795 }
1796 }
1797 // Inject the log tick
1798 {
1799 let tick_timeline = Timeline::log_tick();
1800
1801 let tick = inner
1802 .tick
1803 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1804
1805 let repeated_tick = std::iter::repeat_n(tick, chunk.num_rows()).collect();
1806
1807 let tick_chunk = TimeColumn::new(Some(true), tick_timeline, repeated_tick);
1808
1809 if let Err(err) = chunk.add_timeline(tick_chunk) {
1810 re_log::error!(
1811 "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1812 tick_timeline.name(),
1813 err
1814 );
1815 return;
1816 }
1817 }
1818
1819 inner.batcher.push_chunk(chunk);
1820 };
1821
1822 if self.with(f).is_none() {
1823 re_log::warn_once!("Recording disabled - call to log_chunk() ignored");
1824 }
1825 }
1826
1827 /// Logs multiple [`Chunk`]s.
1828 ///
1829 /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1830 /// for that use [`Self::log_chunks`].
1831 pub fn log_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1832 for chunk in chunks {
1833 self.log_chunk(chunk);
1834 }
1835 }
1836
1837 /// Records a single [`Chunk`].
1838 ///
1839 /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1840 /// If you don't want to inject these, use [`Self::send_chunks`] instead.
1841 #[inline]
1842 pub fn send_chunk(&self, chunk: Chunk) {
1843 let f = move |inner: &RecordingStreamInner| {
1844 inner.batcher.push_chunk(chunk);
1845 };
1846
1847 if self.with(f).is_none() {
1848 re_log::warn_once!("Recording disabled - call to send_chunk() ignored");
1849 }
1850 }
1851
1852 /// Records multiple [`Chunk`]s.
1853 ///
1854 /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1855 /// for that use [`Self::log_chunks`].
1856 pub fn send_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1857 for chunk in chunks {
1858 self.send_chunk(chunk);
1859 }
1860 }
1861
1862 /// Swaps the underlying sink for a new one.
1863 ///
1864 /// This guarantees that:
1865 /// 1. all pending rows and chunks are batched, collected and sent down the current sink,
1866 /// 2. the current sink is flushed if it has pending data in its buffers,
1867 /// 3. the current sink's backlog, if there's any, is forwarded to the new sink.
1868 ///
1869 /// When this function returns, the calling thread is guaranteed that all future record calls
1870 /// will end up in the new sink.
1871 ///
1872 /// If the batcher's configuration has not been set explicitly or by environment variables,
1873 /// this will change the batcher configuration to the sink's default configuration.
1874 ///
1875 /// ## Data loss
1876 ///
1877 /// If the current sink is in a broken state (e.g. a gRPC sink with a broken connection that
1878 /// cannot be repaired), all pending data in its buffers will be dropped.
1879 pub fn set_sink(&self, new_sink: Box<dyn LogSink>) {
1880 if self.is_forked_child() {
1881 re_log::error_once!(
1882 "Fork detected during set_sink. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1883 );
1884 return;
1885 }
1886
1887 let timeout = Duration::MAX; // The background thread should block forever if necessary
1888
1889 let f = move |inner: &RecordingStreamInner| {
1890 // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1891 // are safe.
1892
1893 // Flush the batcher down the chunk channel
1894 if let Err(err) = inner.batcher.flush_blocking(timeout) {
1895 re_log::warn!("Failed to flush batcher in `set_sink`: {err}");
1896 }
1897
1898 // Receive pending chunks from the batcher's channel
1899 inner.cmds_tx.send(Command::PopPendingChunks).ok();
1900
1901 // Update the batcher's configuration if it's sink-dependent.
1902 if inner.sink_dependent_batcher_config {
1903 let batcher_config = resolve_batcher_config(None, &*new_sink);
1904 inner.batcher.update_config(batcher_config);
1905 *inner.current_batcher_config.lock() = batcher_config;
1906 }
1907
1908 warn_if_problematic_file_sink_config(&inner.current_batcher_config.lock(), &*new_sink);
1909
1910 // Swap the sink, which will internally make sure to re-ingest the backlog if needed
1911 inner
1912 .cmds_tx
1913 .send(Command::SwapSink { new_sink, timeout })
1914 .ok();
1915
1916 // Before we give control back to the caller, we need to make sure that the swap has
1917 // taken place: we don't want the user to send data to the old sink!
1918 re_log::trace!("Waiting for sink swap to complete…");
1919 let (cmd, oneshot) = Command::flush(timeout);
1920 inner.cmds_tx.send(cmd).ok();
1921 oneshot.recv().ok();
1922 re_log::trace!("Sink swap completed.");
1923 };
1924
1925 if self.with(f).is_none() {
1926 re_log::warn_once!("Recording disabled - call to set_sink() ignored");
1927 }
1928 }
1929
1930 /// Initiates a flush of the pipeline and returns immediately.
1931 ///
1932 /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
1933 /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1934 ///
1935 /// This will never return [`SinkFlushError::Timeout`].
1936 pub fn flush_async(&self) -> Result<(), SinkFlushError> {
1937 re_tracing::profile_function!();
1938 match self.flush(None) {
1939 Err(SinkFlushError::Timeout) => Ok(()),
1940 result => result,
1941 }
1942 }
1943
1944 /// Flush the batching pipeline and waits for it to propagate.
1945 ///
1946 /// The function will block until either the flush has completed successfully (`Ok`),
1947 /// an error has occurred (`SinkFlushError::Failed`), or the timeout is reached (`SinkFlushError::Timeout`).
1948 ///
1949 /// Convenience for calling [`Self::flush_with_timeout`] with a timeout of [`Duration::MAX`]
1950 pub fn flush_blocking(&self) -> Result<(), SinkFlushError> {
1951 re_tracing::profile_function!();
1952 self.flush_with_timeout(Duration::MAX)
1953 }
1954
1955 /// Flush the batching pipeline and optionally waits for it to propagate.
1956 /// If you don't want a timeout you can pass in [`Duration::MAX`].
1957 ///
1958 /// The function will block until that timeout is reached,
1959 /// an error occurs, or the flush is complete.
1960 /// The function will only block while there is some hope of progress.
1961 /// For instance: if the underlying gRPC connection is disconnected (or never connected at all),
1962 /// then [`SinkFlushError::Failed`] is returned.
1963 ///
1964 /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1965 pub fn flush_with_timeout(&self, timeout: Duration) -> Result<(), SinkFlushError> {
1966 re_tracing::profile_function!();
1967 self.flush(Some(timeout))
1968 }
1969
1970 /// Flush the batching pipeline and optionally waits for it to propagate.
1971 ///
1972 /// If `timeout` is `None`, then this function will start the flush, but NOT wait for it to finish.
1973 ///
1974 /// If a `timeout` is given, then the function will block until that timeout is reached,
1975 /// an error occurs, or the flush is complete.
1976 ///
1977 /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1978 fn flush(&self, timeout: Option<Duration>) -> Result<(), SinkFlushError> {
1979 if self.is_forked_child() {
1980 return Err(SinkFlushError::failed(
1981 "Fork detected during flush. cleanup_if_forked() should always be called after forking. This is likely a bug in the Rerun SDK.",
1982 ));
1983 }
1984
1985 let f = move |inner: &RecordingStreamInner| -> Result<(), SinkFlushError> {
1986 // 0. Wait for all pending importer threads to complete
1987 //
1988 // This ensures that data from `log_file_from_path` and `log_file_from_contents`
1989 // is fully loaded before we flush the batcher and sink.
1990 inner.wait_for_importers();
1991
1992 // 1. Synchronously flush the batcher down the chunk channel
1993 //
1994 // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all chunks
1995 // are ready to be drained by the time this call returns.
1996 // It cannot block indefinitely and is fairly fast as it only requires compute (no I/O).
1997 inner
1998 .batcher
1999 .flush_blocking(Duration::MAX)
2000 .map_err(|err| match err {
2001 BatcherFlushError::Closed => SinkFlushError::failed(err.to_string()),
2002 BatcherFlushError::Timeout => SinkFlushError::Timeout,
2003 })?;
2004
2005 // 2. Drain all pending chunks from the batcher's channel _before_ any other future command
2006 inner
2007 .cmds_tx
2008 .send(Command::PopPendingChunks)
2009 .map_err(|_ignored| {
2010 SinkFlushError::failed(
2011 "Sink shut down prematurely. This is likely a bug in the Rerun SDK.",
2012 )
2013 })?;
2014
2015 // 3. Asynchronously flush everything down the sink
2016 let (cmd, on_done) = Command::flush(Duration::MAX); // The background thread should block forever if necessary
2017 inner.cmds_tx.send(cmd).map_err(|_ignored| {
2018 SinkFlushError::failed(
2019 "Sink shut down prematurely. This is likely a bug in the Rerun SDK.",
2020 )
2021 })?;
2022
2023 if let Some(timeout) = timeout {
2024 on_done.recv_timeout(timeout).map_err(|err| match err {
2025 RecvTimeoutError::Timeout => SinkFlushError::Timeout,
2026 RecvTimeoutError::Disconnected => SinkFlushError::failed(
2027 "Flush never finished. This is likely a bug in the Rerun SDK.",
2028 ),
2029 })??;
2030 }
2031
2032 Ok(())
2033 };
2034
2035 match self.with(f) {
2036 Some(Ok(())) => Ok(()),
2037 Some(Err(err)) => Err(err),
2038 None => {
2039 re_log::warn_once!("Recording disabled - call to flush ignored");
2040 Ok(())
2041 }
2042 }
2043 }
2044}
2045
2046impl RecordingStream {
2047 /// Stream data to multiple different sinks.
2048 ///
2049 /// This is semantically the same as calling [`RecordingStream::set_sink`], but the resulting
2050 /// [`RecordingStream`] will now stream data to multiple sinks at the same time.
2051 ///
2052 /// Currently only supports [`GrpcSink`][grpc_sink] and [`FileSink`][file_sink].
2053 ///
2054 /// If the batcher's configuration has not been set explicitly or by environment variables,
2055 /// This will take over a conservative default of the new sinks.
2056 /// (there's no guarantee on when exactly the new configuration will be active)
2057 ///
2058 /// [grpc_sink]: crate::sink::GrpcSink
2059 /// [file_sink]: crate::sink::FileSink
2060 pub fn set_sinks(&self, sinks: impl crate::log_sink::IntoMultiSink) {
2061 if forced_sink_path().is_some() {
2062 re_log::debug!("Ignored setting new MultiSink since {ENV_FORCE_SAVE} is set");
2063 return;
2064 }
2065
2066 let sink = sinks.into_multi_sink();
2067
2068 self.set_sink(Box::new(sink));
2069 }
2070
2071 /// Asynchronously calls a method that has read access to the currently active sink.
2072 ///
2073 /// Since a recording stream's sink is owned by a different thread there is no guarantee when
2074 /// the callback is going to be called.
2075 /// It's advised to return as quickly as possible from the callback since
2076 /// as long as the callback doesn't return, the sink will not receive any new data,
2077 ///
2078 /// # Experimental
2079 ///
2080 /// This is an experimental API and may change in future releases.
2081 // TODO(#10444): This should become a lot more straight forward with explicit sinks.
2082 pub fn inspect_sink(&self, f: impl FnOnce(&dyn LogSink) + Send + 'static) {
2083 self.with(|inner| inner.cmds_tx.send(Command::InspectSink(Box::new(f))).ok());
2084 }
2085
2086 /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
2087 /// the specified address.
2088 ///
2089 /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
2090 ///
2091 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2092 /// terms of data durability and ordering.
2093 /// See [`Self::set_sink`] for more information.
2094 pub fn connect_grpc(&self) -> RecordingStreamResult<()> {
2095 self.connect_grpc_opts(format!(
2096 "rerun+http://127.0.0.1:{}/proxy",
2097 crate::DEFAULT_SERVER_PORT
2098 ))
2099 }
2100
2101 /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
2102 /// the specified address.
2103 ///
2104 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2105 /// terms of data durability and ordering.
2106 /// See [`Self::set_sink`] for more information.
2107 ///
2108 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
2109 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
2110 /// call to `flush` to block indefinitely if a connection cannot be established.
2111 pub fn connect_grpc_opts(&self, url: impl Into<String>) -> RecordingStreamResult<()> {
2112 if forced_sink_path().is_some() {
2113 re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
2114 return Ok(());
2115 }
2116
2117 let url: String = url.into();
2118 let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
2119 return Err(RecordingStreamError::NotAProxyEndpoint);
2120 };
2121
2122 let sink = crate::log_sink::GrpcSink::new(uri);
2123
2124 self.set_sink(Box::new(sink));
2125 Ok(())
2126 }
2127
2128 #[cfg(feature = "server")]
2129 /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
2130 /// `rerun+http://127.0.0.1:9876/proxy`.
2131 ///
2132 /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
2133 ///
2134 /// You can connect a viewer to it with `rerun --connect`.
2135 ///
2136 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
2137 /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
2138 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
2139 pub fn serve_grpc(
2140 &self,
2141 server_options: re_grpc_server::ServerOptions,
2142 ) -> RecordingStreamResult<()> {
2143 self.serve_grpc_opts("0.0.0.0", crate::DEFAULT_SERVER_PORT, server_options)
2144 }
2145
2146 #[cfg(feature = "server")]
2147 /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
2148 /// `rerun+http://{bind_ip}:{port}/proxy`.
2149 ///
2150 /// `0.0.0.0` is a good default for `bind_ip`.
2151 ///
2152 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
2153 /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
2154 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
2155 pub fn serve_grpc_opts(
2156 &self,
2157 bind_ip: impl AsRef<str>,
2158 port: u16,
2159 server_options: re_grpc_server::ServerOptions,
2160 ) -> RecordingStreamResult<()> {
2161 if forced_sink_path().is_some() {
2162 re_log::debug!("Ignored setting GrpcServerSink since {ENV_FORCE_SAVE} is set");
2163 return Ok(());
2164 }
2165
2166 let sink = crate::grpc_server::GrpcServerSink::new(bind_ip.as_ref(), port, server_options)?;
2167
2168 self.set_sink(Box::new(sink));
2169 Ok(())
2170 }
2171
2172 /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
2173 /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
2174 /// new process.
2175 ///
2176 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
2177 /// that viewer instead of starting a new one.
2178 ///
2179 /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
2180 /// as well as the underlying connection.
2181 ///
2182 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2183 /// terms of data durability and ordering.
2184 /// See [`Self::set_sink`] for more information.
2185 pub fn spawn(&self) -> RecordingStreamResult<()> {
2186 self.spawn_opts(&Default::default())
2187 }
2188
2189 /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
2190 /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
2191 /// new process.
2192 ///
2193 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
2194 /// that viewer instead of starting a new one.
2195 ///
2196 /// The behavior of the spawned Viewer can be configured via `opts`.
2197 /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
2198 ///
2199 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2200 /// terms of data durability and ordering.
2201 /// See [`Self::set_sink`] for more information.
2202 ///
2203 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
2204 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
2205 /// call to `flush` to block indefinitely if a connection cannot be established.
2206 pub fn spawn_opts(&self, opts: &crate::SpawnOptions) -> RecordingStreamResult<()> {
2207 if !self.is_enabled() {
2208 re_log::debug!("Rerun disabled - call to spawn() ignored");
2209 return Ok(());
2210 }
2211 if forced_sink_path().is_some() {
2212 re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
2213 return Ok(());
2214 }
2215
2216 let actual_port = crate::spawn(opts)?;
2217 let addr = std::net::SocketAddr::new(
2218 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2219 actual_port,
2220 );
2221 self.connect_grpc_opts(format!("rerun+http://{addr}/proxy"))?;
2222
2223 Ok(())
2224 }
2225
2226 /// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
2227 /// [`MemorySinkStorage`].
2228 ///
2229 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2230 /// terms of data durability and ordering.
2231 /// See [`Self::set_sink`] for more information.
2232 pub fn memory(&self) -> MemorySinkStorage {
2233 let sink = crate::sink::MemorySink::new(self.clone());
2234 let storage = sink.buffer();
2235 self.set_sink(Box::new(sink));
2236 storage
2237 }
2238
2239 /// Swaps the underlying sink for a [`crate::sink::BinaryStreamSink`] sink and returns the associated
2240 /// [`BinaryStreamStorage`].
2241 ///
2242 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2243 /// terms of data durability and ordering.
2244 /// See [`Self::set_sink`] for more information.
2245 pub fn binary_stream(&self) -> BinaryStreamStorage {
2246 let (sink, storage) = crate::sink::BinaryStreamSink::new(self.clone());
2247 self.set_sink(Box::new(sink));
2248 storage
2249 }
2250
2251 /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2252 ///
2253 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2254 /// terms of data durability and ordering.
2255 /// See [`Self::set_sink`] for more information.
2256 pub fn save(
2257 &self,
2258 path: impl Into<std::path::PathBuf>,
2259 ) -> Result<(), crate::sink::FileSinkError> {
2260 self.save_opts(path)
2261 }
2262
2263 /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2264 ///
2265 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2266 /// terms of data durability and ordering.
2267 /// See [`Self::set_sink`] for more information.
2268 ///
2269 /// If a blueprint was provided, it will be stored first in the file.
2270 /// Blueprints are currently an experimental part of the Rust SDK.
2271 pub fn save_opts(
2272 &self,
2273 path: impl Into<std::path::PathBuf>,
2274 ) -> Result<(), crate::sink::FileSinkError> {
2275 if forced_sink_path().is_some() {
2276 re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2277 return Ok(());
2278 }
2279
2280 let sink = crate::sink::FileSink::new(path)?;
2281
2282 self.set_sink(Box::new(sink));
2283
2284 Ok(())
2285 }
2286
2287 /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2288 ///
2289 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2290 /// default back to `buffered` mode, in order not to break the user's terminal.
2291 ///
2292 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2293 /// terms of data durability and ordering.
2294 /// See [`Self::set_sink`] for more information.
2295 pub fn stdout(&self) -> Result<(), crate::sink::FileSinkError> {
2296 self.stdout_opts()
2297 }
2298
2299 /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2300 ///
2301 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2302 /// default back to `buffered` mode, in order not to break the user's terminal.
2303 ///
2304 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2305 /// terms of data durability and ordering.
2306 /// See [`Self::set_sink`] for more information.
2307 ///
2308 /// If a blueprint was provided, it will be stored first in the file.
2309 /// Blueprints are currently an experimental part of the Rust SDK.
2310 pub fn stdout_opts(&self) -> Result<(), crate::sink::FileSinkError> {
2311 if forced_sink_path().is_some() {
2312 re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2313 return Ok(());
2314 }
2315
2316 if std::io::stdout().is_terminal() {
2317 re_log::debug!("Ignored call to stdout() because stdout is a terminal");
2318 self.set_sink(Box::new(crate::log_sink::BufferedSink::new()));
2319 return Ok(());
2320 }
2321
2322 let sink = crate::sink::FileSink::stdout()?;
2323
2324 self.set_sink(Box::new(sink));
2325
2326 Ok(())
2327 }
2328
2329 /// Swaps the underlying sink for a [`crate::sink::BufferedSink`].
2330 ///
2331 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2332 /// terms of data durability and ordering.
2333 /// See [`Self::set_sink`] for more information.
2334 pub fn disconnect(&self) {
2335 let f = move |inner: &RecordingStreamInner| {
2336 // When disconnecting, we need to make sure that pending top-level importer threads that
2337 // were started from the SDK run to completion.
2338 inner.wait_for_importers();
2339 self.set_sink(Box::new(crate::sink::BufferedSink::new()));
2340 };
2341
2342 if self.with(f).is_none() {
2343 re_log::warn_once!("Recording disabled - call to disconnect() ignored");
2344 }
2345 }
2346
2347 /// Finalize any sinks whose on-disk format only completes at shutdown (i.e. file-like sinks
2348 /// that write a footer at the end), while leaving streaming sinks (e.g. gRPC) intact.
2349 ///
2350 /// For a bare deferring sink (e.g. `FileSink` from `save()`), this is equivalent to
2351 /// [`Self::disconnect`]: the sink is replaced with a [`crate::sink::BufferedSink`] and its
2352 /// `Drop` impl runs the writer thread to completion, which is what emits the footer.
2353 ///
2354 /// For a [`crate::log_sink::MultiSink`] containing a mix of deferring and streaming children,
2355 /// only the deferring children are dropped. The streaming children remain live and the
2356 /// `MultiSink` continues to receive new messages.
2357 ///
2358 /// For all other sinks this is a no-op.
2359 ///
2360 /// Used by Python's `RecordingStream.__exit__` so file-backed recordings are consumable as
2361 /// soon as the `with`-block exits, without waiting for `__del__` / GC.
2362 pub fn finalize_deferred_sinks(&self) {
2363 let timeout = Duration::MAX;
2364 let f = move |inner: &RecordingStreamInner| {
2365 inner.wait_for_importers();
2366
2367 // Flush the batcher down the chunk channel so any pending data lands in the sink
2368 // before we tear it down.
2369 if let Err(err) = inner.batcher.flush_blocking(timeout) {
2370 re_log::warn!("Failed to flush batcher in `finalize_deferred_sinks`: {err}");
2371 }
2372 inner.cmds_tx.send(Command::PopPendingChunks).ok();
2373
2374 let (cmd, oneshot) = Command::finalize_deferred_sinks(timeout);
2375 inner.cmds_tx.send(cmd).ok();
2376 oneshot.recv().ok();
2377 };
2378
2379 if self.with(f).is_none() {
2380 re_log::warn_once!("Recording disabled - call to finalize_deferred_sinks() ignored");
2381 }
2382 }
2383
2384 /// Send a blueprint through this recording stream.
2385 pub fn send_blueprint(
2386 &self,
2387 blueprint: Vec<LogMsg>,
2388 activation_cmd: BlueprintActivationCommand,
2389 ) {
2390 let mut blueprint_id = None;
2391 for msg in blueprint {
2392 if blueprint_id.is_none() {
2393 blueprint_id = Some(msg.store_id().clone());
2394 }
2395 self.record_msg(msg);
2396 }
2397
2398 if let Some(blueprint_id) = blueprint_id {
2399 if blueprint_id == activation_cmd.blueprint_id {
2400 // Let the viewer know that the blueprint has been fully received,
2401 // and that it can now be activated.
2402 // We don't want to activate half-loaded blueprints, because that can be confusing,
2403 // and can also lead to problems with view heuristics.
2404 self.record_msg(activation_cmd.into());
2405 } else {
2406 re_log::warn!(
2407 "Blueprint ID mismatch when sending blueprint: {:?} != {:?}. Ignoring activation.",
2408 blueprint_id,
2409 activation_cmd.blueprint_id
2410 );
2411 }
2412 }
2413 }
2414
2415 /// Send a [`crate::blueprint::Blueprint`] to configure the viewer layout.
2416 pub fn send_blueprint_opts(
2417 &self,
2418 opts: &crate::blueprint::BlueprintOpts,
2419 ) -> RecordingStreamResult<()> {
2420 opts.blueprint.send(self, opts.activation)
2421 }
2422}
2423
2424impl fmt::Debug for RecordingStream {
2425 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2426 let with = |inner: &RecordingStreamInner| {
2427 let RecordingStreamInner {
2428 // This pattern match prevents _accidentally_ omitting data from the debug output
2429 // when new fields are added.
2430 store_info,
2431 recording_info,
2432 tick,
2433 cmds_tx: _,
2434 batcher: _,
2435 batcher_to_sink_handle: _,
2436 current_batcher_config,
2437 sink_dependent_batcher_config,
2438 importer_handles,
2439 pid_at_creation,
2440 } = inner;
2441
2442 f.debug_struct("RecordingStream")
2443 .field("store_info", &store_info)
2444 .field("recording_info", &recording_info)
2445 .field("tick", &tick)
2446 .field("current_batcher_config", &*current_batcher_config.lock())
2447 .field(
2448 "sink_dependent_batcher_config",
2449 &sink_dependent_batcher_config,
2450 )
2451 .field("pending_importers", &importer_handles.lock().len())
2452 .field("pid_at_creation", &pid_at_creation)
2453 .finish_non_exhaustive()
2454 };
2455
2456 match self.with(with) {
2457 Some(res) => res,
2458 None => write!(f, "RecordingStream {{ disabled }}"),
2459 }
2460 }
2461}
2462
2463// --- Stateful time ---
2464
2465/// Thread-local data.
2466#[derive(Default)]
2467struct ThreadInfo {
2468 /// The current time per-thread per-recording, which can be set by users.
2469 timepoints: HashMap<StoreId, TimePoint>,
2470}
2471
2472impl ThreadInfo {
2473 fn thread_now(rid: &StoreId) -> TimePoint {
2474 Self::with(|ti| ti.now(rid))
2475 }
2476
2477 fn set_thread_time(rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2478 Self::with(|ti| ti.set_time(rid, timeline, cell));
2479 }
2480
2481 fn unset_thread_time(rid: &StoreId, timeline: &TimelineName) {
2482 Self::with(|ti| ti.unset_time(rid, timeline));
2483 }
2484
2485 fn reset_thread_time(rid: &StoreId) {
2486 Self::with(|ti| ti.reset_time(rid));
2487 }
2488
2489 /// Get access to the thread-local [`ThreadInfo`].
2490 fn with<R>(f: impl FnOnce(&mut Self) -> R) -> R {
2491 use std::cell::RefCell;
2492 thread_local! {
2493 static THREAD_INFO: RefCell<Option<ThreadInfo>> = const { RefCell::new(None) };
2494 }
2495
2496 THREAD_INFO.with(|thread_info| {
2497 let mut thread_info = thread_info.borrow_mut();
2498 let thread_info = thread_info.get_or_insert_with(Self::default);
2499 f(thread_info)
2500 })
2501 }
2502
2503 fn now(&self, rid: &StoreId) -> TimePoint {
2504 let mut timepoint = self.timepoints.get(rid).cloned().unwrap_or_default();
2505 timepoint.insert_cell(TimelineName::log_time(), TimeCell::timestamp_now());
2506 timepoint
2507 }
2508
2509 fn set_time(&mut self, rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2510 self.timepoints
2511 .entry(rid.clone())
2512 .or_default()
2513 .insert_cell(timeline, cell);
2514 }
2515
2516 fn unset_time(&mut self, rid: &StoreId, timeline: &TimelineName) {
2517 if let Some(timepoint) = self.timepoints.get_mut(rid) {
2518 timepoint.remove(timeline);
2519 }
2520 }
2521
2522 fn reset_time(&mut self, rid: &StoreId) {
2523 if let Some(timepoint) = self.timepoints.get_mut(rid) {
2524 *timepoint = TimePoint::default();
2525 }
2526 }
2527}
2528
2529impl RecordingStream {
2530 /// Returns the current time of the recording on the current thread.
2531 pub fn now(&self) -> TimePoint {
2532 let f =
2533 move |inner: &RecordingStreamInner| ThreadInfo::thread_now(&inner.store_info.store_id);
2534 if let Some(res) = self.with(f) {
2535 res
2536 } else {
2537 re_log::warn_once!("Recording disabled - call to now() ignored");
2538 TimePoint::default()
2539 }
2540 }
2541
2542 /// Set the current time of the recording, for the current calling thread.
2543 ///
2544 /// Used for all subsequent logging performed from this same thread, until the next call
2545 /// to one of the index/time setting methods.
2546 ///
2547 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2548 ///
2549 /// See also:
2550 /// - [`Self::set_time`]
2551 /// - [`Self::set_time_sequence`]
2552 /// - [`Self::set_duration_secs`]
2553 /// - [`Self::disable_timeline`]
2554 /// - [`Self::reset_time`]
2555 pub fn set_timepoint(&self, timepoint: impl Into<TimePoint>) {
2556 let f = move |inner: &RecordingStreamInner| {
2557 let timepoint = timepoint.into();
2558 for (timeline, time) in timepoint {
2559 ThreadInfo::set_thread_time(&inner.store_info.store_id, timeline, time);
2560 }
2561 };
2562
2563 if self.with(f).is_none() {
2564 re_log::warn_once!("Recording disabled - call to set_timepoint() ignored");
2565 }
2566 }
2567
2568 /// Set the current value of one of the timelines.
2569 ///
2570 /// Used for all subsequent logging performed from this same thread, until the next call
2571 /// to one of the index/time setting methods.
2572 ///
2573 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2574 ///
2575 /// Example:
2576 /// ```no_run
2577 /// # mod rerun { pub use re_sdk::*; }
2578 /// # let rec: rerun::RecordingStream = unimplemented!();
2579 /// rec.set_time("frame_nr", rerun::TimeCell::from_sequence(42));
2580 /// rec.set_time("duration", std::time::Duration::from_millis(123));
2581 /// rec.set_time("capture_time", std::time::SystemTime::now());
2582 /// ```
2583 ///
2584 /// See also:
2585 /// - [`Self::set_timepoint`]
2586 /// - [`Self::set_time_sequence`]
2587 /// - [`Self::set_duration_secs`]
2588 /// - [`Self::disable_timeline`]
2589 /// - [`Self::reset_time`]
2590 pub fn set_time(&self, timeline: impl Into<TimelineName>, value: impl TryInto<TimeCell>) {
2591 let f = move |inner: &RecordingStreamInner| {
2592 let timeline = timeline.into();
2593 if let Ok(value) = value.try_into() {
2594 ThreadInfo::set_thread_time(&inner.store_info.store_id, timeline, value);
2595 } else {
2596 re_log::warn_once!(
2597 "set_time({timeline}): Failed to convert the given value to an TimeCell"
2598 );
2599 }
2600 };
2601
2602 if self.with(f).is_none() {
2603 re_log::warn_once!("Recording disabled - call to set_time() ignored");
2604 }
2605 }
2606
2607 /// Set the current time of the recording, for the current calling thread.
2608 ///
2609 /// Short for `set_time(timeline, rerun::TimeCell::from_sequence(sequence))`.
2610 ///
2611 /// Used for all subsequent logging performed from this same thread, until the next call
2612 /// to one of the index/time setting methods.
2613 ///
2614 /// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
2615 /// You can remove a timeline again using `rec.disable_timeline("frame_nr")`.
2616 ///
2617 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2618 ///
2619 /// See also:
2620 /// - [`Self::set_time`]
2621 /// - [`Self::set_timepoint`]
2622 /// - [`Self::set_duration_secs`]
2623 /// - [`Self::disable_timeline`]
2624 /// - [`Self::reset_time`]
2625 #[inline]
2626 pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: impl Into<i64>) {
2627 self.set_time(timeline, TimeCell::from_sequence(sequence.into()));
2628 }
2629
2630 /// Set the current time of the recording, for the current calling thread.
2631 ///
2632 /// Short for `set_time(timeline, std::time::Duration::from_secs_f64(secs))`..
2633 ///
2634 /// Used for all subsequent logging performed from this same thread, until the next call
2635 /// to one of the index/time setting methods.
2636 ///
2637 /// For example: `rec.set_duration_secs("time_since_start", time_offset)`.
2638 /// You can remove a timeline again using `rec.disable_timeline("time_since_start")`.
2639 ///
2640 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2641 ///
2642 /// See also:
2643 /// - [`Self::set_time`]
2644 /// - [`Self::set_timepoint`]
2645 /// - [`Self::set_timestamp_secs_since_epoch`]
2646 /// - [`Self::set_time_sequence`]
2647 /// - [`Self::disable_timeline`]
2648 /// - [`Self::reset_time`]
2649 #[inline]
2650 pub fn set_duration_secs(&self, timeline: impl Into<TimelineName>, secs: impl Into<f64>) {
2651 let secs = secs.into();
2652 if let Ok(duration) = std::time::Duration::try_from_secs_f64(secs) {
2653 self.set_time(timeline, duration);
2654 } else {
2655 re_log::error_once!("set_duration_secs: can't set time to {secs}");
2656 }
2657 }
2658
2659 /// Set a timestamp as seconds since Unix epoch (1970-01-01 00:00:00 UTC).
2660 ///
2661 /// Short for `self.set_time(timeline, rerun::TimeCell::from_timestamp_secs_since_epoch(secs))`.
2662 ///
2663 /// Used for all subsequent logging performed from this same thread, until the next call
2664 /// to one of the index/time setting methods.
2665 ///
2666 /// You can remove a timeline again using `rec.disable_timeline(timeline)`.
2667 ///
2668 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2669 ///
2670 /// See also:
2671 /// - [`Self::set_time`]
2672 /// - [`Self::set_timepoint`]
2673 /// - [`Self::set_duration_secs`]
2674 /// - [`Self::set_time_sequence`]
2675 /// - [`Self::set_timestamp_nanos_since_epoch`]
2676 /// - [`Self::disable_timeline`]
2677 /// - [`Self::reset_time`]
2678 #[inline]
2679 pub fn set_timestamp_secs_since_epoch(
2680 &self,
2681 timeline: impl Into<TimelineName>,
2682 secs: impl Into<f64>,
2683 ) {
2684 self.set_time(
2685 timeline,
2686 TimeCell::from_timestamp_secs_since_epoch(secs.into()),
2687 );
2688 }
2689
2690 /// Set a timestamp as nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
2691 ///
2692 /// Short for `self.set_time(timeline, rerun::TimeCell::set_timestamp_nanos_since_epoch(secs))`.
2693 ///
2694 /// Used for all subsequent logging performed from this same thread, until the next call
2695 /// to one of the index/time setting methods.
2696 ///
2697 /// You can remove a timeline again using `rec.disable_timeline(timeline)`.
2698 ///
2699 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2700 ///
2701 /// See also:
2702 /// - [`Self::set_time`]
2703 /// - [`Self::set_timepoint`]
2704 /// - [`Self::set_duration_secs`]
2705 /// - [`Self::set_time_sequence`]
2706 /// - [`Self::set_timestamp_secs_since_epoch`]
2707 /// - [`Self::disable_timeline`]
2708 /// - [`Self::reset_time`]
2709 #[inline]
2710 pub fn set_timestamp_nanos_since_epoch(
2711 &self,
2712 timeline: impl Into<TimelineName>,
2713 nanos: impl Into<i64>,
2714 ) {
2715 self.set_time(
2716 timeline,
2717 TimeCell::from_timestamp_nanos_since_epoch(nanos.into()),
2718 );
2719 }
2720
2721 /// Clears out the current time of the recording for the specified timeline, for the
2722 /// current calling thread.
2723 ///
2724 /// For example: `rec.disable_timeline("frame")`, `rec.disable_timeline("sim_time")`.
2725 ///
2726 /// See also:
2727 /// - [`Self::set_timepoint`]
2728 /// - [`Self::set_time_sequence`]
2729 /// - [`Self::reset_time`]
2730 pub fn disable_timeline(&self, timeline: impl Into<TimelineName>) {
2731 let f = move |inner: &RecordingStreamInner| {
2732 let timeline = timeline.into();
2733 ThreadInfo::unset_thread_time(&inner.store_info.store_id, &timeline);
2734 };
2735
2736 if self.with(f).is_none() {
2737 re_log::warn_once!("Recording disabled - call to disable_timeline() ignored");
2738 }
2739 }
2740
2741 /// Clears out the current time of the recording, for the current calling thread.
2742 ///
2743 /// Used for all subsequent logging performed from this same thread, until the next call
2744 /// to one of the index/time setting methods.
2745 ///
2746 /// For example: `rec.reset_time()`.
2747 ///
2748 /// See also:
2749 /// - [`Self::set_timepoint`]
2750 /// - [`Self::set_time_sequence`]
2751 /// - [`Self::disable_timeline`]
2752 pub fn reset_time(&self) {
2753 let f = move |inner: &RecordingStreamInner| {
2754 ThreadInfo::reset_thread_time(&inner.store_info.store_id);
2755 };
2756
2757 if self.with(f).is_none() {
2758 re_log::warn_once!("Recording disabled - call to reset_time() ignored");
2759 }
2760 }
2761}
2762
2763// ---
2764
2765#[cfg(test)]
2766mod tests {
2767 use insta::assert_debug_snapshot;
2768 use itertools::Itertools as _;
2769 use re_log_types::example_components::{MyLabel, MyPoints};
2770 use re_sdk_types::SerializedComponentBatch;
2771
2772 use super::*;
2773
2774 struct DisplayDescrs(Chunk);
2775
2776 impl std::fmt::Debug for DisplayDescrs {
2777 #[inline]
2778 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2779 f.debug_list()
2780 .entries(
2781 self.0
2782 .component_descriptors()
2783 .map(|d| d.display_name().to_owned())
2784 .sorted(),
2785 )
2786 .finish()
2787 }
2788 }
2789
2790 #[test]
2791 fn impl_send_sync() {
2792 fn assert_send_sync<T: Send + Sync>() {}
2793 assert_send_sync::<RecordingStream>();
2794 }
2795
2796 #[test]
2797 fn never_flush() {
2798 let rec = RecordingStreamBuilder::new("rerun_example_never_flush")
2799 .enabled(true)
2800 .batcher_config(ChunkBatcherConfig::NEVER)
2801 .buffered()
2802 .unwrap();
2803
2804 let store_info = rec.store_info().unwrap();
2805
2806 let rows = example_rows(false);
2807 for row in rows.clone() {
2808 rec.record_row("a".into(), row, false);
2809 }
2810
2811 let storage = rec.memory();
2812 let mut msgs = {
2813 let mut msgs = storage.take();
2814 msgs.reverse();
2815 msgs
2816 };
2817
2818 // First message should be a set_store_info resulting from the original sink swap to
2819 // buffered mode.
2820 match msgs.pop().unwrap() {
2821 LogMsg::SetStoreInfo(msg) => {
2822 assert!(msg.row_id != *RowId::ZERO);
2823 similar_asserts::assert_eq!(store_info, msg.info);
2824 }
2825 _ => panic!("expected SetStoreInfo"),
2826 }
2827
2828 // Second message should be a set_store_info resulting from the later sink swap from
2829 // buffered mode into in-memory mode.
2830 // This arrives _before_ the data itself since we're using manual flushing.
2831 match msgs.pop().unwrap() {
2832 LogMsg::SetStoreInfo(msg) => {
2833 assert!(msg.row_id != *RowId::ZERO);
2834 similar_asserts::assert_eq!(store_info, msg.info);
2835 }
2836 _ => panic!("expected SetStoreInfo"),
2837 }
2838
2839 // The following flushes were sent as a result of the implicit flush when swapping the
2840 // underlying sink from buffered to in-memory.
2841
2842 // Chunk that contains the `RecordingInfo`.
2843 match msgs.pop().unwrap() {
2844 LogMsg::ArrowMsg(rid, msg) => {
2845 assert_eq!(store_info.store_id, rid);
2846
2847 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2848
2849 chunk.sanity_check().unwrap();
2850
2851 assert_debug_snapshot!(DisplayDescrs(chunk));
2852 }
2853 _ => panic!("expected ArrowMsg"),
2854 }
2855
2856 // Final message is the batched chunk itself.
2857 match msgs.pop().unwrap() {
2858 LogMsg::ArrowMsg(rid, msg) => {
2859 assert_eq!(store_info.store_id, rid);
2860
2861 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2862
2863 chunk.sanity_check().unwrap();
2864
2865 assert_debug_snapshot!(DisplayDescrs(chunk));
2866 }
2867 _ => panic!("expected ArrowMsg"),
2868 }
2869
2870 // That's all.
2871 assert!(msgs.pop().is_none());
2872 }
2873
2874 #[test]
2875 fn always_flush() {
2876 let rec = RecordingStreamBuilder::new("rerun_example_always_flush")
2877 .enabled(true)
2878 .batcher_config(ChunkBatcherConfig::ALWAYS_TEST_ONLY)
2879 .buffered()
2880 .unwrap();
2881
2882 let store_info = rec.store_info().unwrap();
2883
2884 let rows = example_rows(false);
2885 for row in rows.clone() {
2886 rec.record_row("a".into(), row, false);
2887 }
2888
2889 let storage = rec.memory();
2890 let mut msgs = {
2891 let mut msgs = storage.take();
2892 msgs.reverse();
2893 msgs
2894 };
2895
2896 // First message should be a set_store_info resulting from the original sink swap to
2897 // buffered mode.
2898 match msgs.pop().unwrap() {
2899 LogMsg::SetStoreInfo(msg) => {
2900 assert!(msg.row_id != *RowId::ZERO);
2901 similar_asserts::assert_eq!(store_info, msg.info);
2902 }
2903 _ => panic!("expected SetStoreInfo"),
2904 }
2905
2906 // Second message should be a set_store_info resulting from the later sink swap from
2907 // buffered mode into in-memory mode.
2908 // This arrives _before_ the data itself since we're using manual flushing.
2909 match msgs.pop().unwrap() {
2910 LogMsg::SetStoreInfo(msg) => {
2911 assert!(msg.row_id != *RowId::ZERO);
2912 similar_asserts::assert_eq!(store_info, msg.info);
2913 }
2914 _ => panic!("expected SetStoreInfo"),
2915 }
2916
2917 let mut assert_next_row = || match msgs.pop().unwrap() {
2918 LogMsg::ArrowMsg(rid, msg) => {
2919 assert_eq!(store_info.store_id, rid);
2920
2921 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2922
2923 chunk.sanity_check().unwrap();
2924
2925 assert_debug_snapshot!(DisplayDescrs(chunk));
2926 }
2927 _ => panic!("expected ArrowMsg"),
2928 };
2929
2930 // 3rd, 4th, 5th, and 6th messages are all the single-row batched chunks themselves,
2931 // which were sent as a result of the implicit flush when swapping the underlying sink
2932 // from buffered to in-memory. Note that these messages contain the 2 recording property
2933 // chunks.
2934 assert_next_row(); // Contains `RecordingInfo`
2935 assert_next_row();
2936 assert_next_row();
2937 assert_next_row();
2938
2939 // That's all.
2940 assert!(msgs.pop().is_none());
2941 }
2942
2943 #[test]
2944 fn flush_hierarchy() {
2945 let (rec, storage) = RecordingStreamBuilder::new("rerun_example_flush_hierarchy")
2946 .enabled(true)
2947 .batcher_config(ChunkBatcherConfig::NEVER)
2948 .memory()
2949 .unwrap();
2950
2951 let store_info = rec.store_info().unwrap();
2952
2953 let rows = example_rows(false);
2954 for row in rows.clone() {
2955 rec.record_row("a".into(), row, false);
2956 }
2957
2958 {
2959 let mut msgs = {
2960 let mut msgs = storage.take();
2961 msgs.reverse();
2962 msgs
2963 };
2964
2965 // First message should be a set_store_info resulting from the original sink swap
2966 // to in-memory mode.
2967 match msgs.pop().unwrap() {
2968 LogMsg::SetStoreInfo(msg) => {
2969 assert!(msg.row_id != *RowId::ZERO);
2970 similar_asserts::assert_eq!(store_info, msg.info);
2971 }
2972 _ => panic!("expected SetStoreInfo"),
2973 }
2974
2975 // For reasons, MemorySink ends up with 2 StoreInfos.
2976 // TODO(jleibs): Avoid a redundant StoreInfo message.
2977 match msgs.pop().unwrap() {
2978 LogMsg::SetStoreInfo(msg) => {
2979 assert!(msg.row_id != *RowId::ZERO);
2980 similar_asserts::assert_eq!(store_info, msg.info);
2981 }
2982 _ => panic!("expected SetStoreInfo"),
2983 }
2984
2985 // MemorySinkStorage transparently handles flushing during `take()`!
2986
2987 // The batch that contains the `RecordingInfo`.
2988 match msgs.pop().unwrap() {
2989 LogMsg::ArrowMsg(rid, msg) => {
2990 assert_eq!(store_info.store_id, rid);
2991
2992 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2993
2994 chunk.sanity_check().unwrap();
2995
2996 assert_debug_snapshot!(DisplayDescrs(chunk));
2997 }
2998 _ => panic!("expected ArrowMsg"),
2999 }
3000
3001 // The batched chunk itself, which was sent as a result of the explicit flush above.
3002 match msgs.pop().unwrap() {
3003 LogMsg::ArrowMsg(rid, msg) => {
3004 assert_eq!(store_info.store_id, rid);
3005
3006 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
3007
3008 chunk.sanity_check().unwrap();
3009
3010 assert_debug_snapshot!(DisplayDescrs(chunk));
3011 }
3012 _ => panic!("expected ArrowMsg"),
3013 }
3014
3015 // That's all.
3016 assert!(msgs.pop().is_none());
3017 }
3018 }
3019
3020 #[test]
3021 fn disabled() {
3022 let (rec, storage) = RecordingStreamBuilder::new("rerun_example_disabled")
3023 .enabled(false)
3024 .batcher_config(ChunkBatcherConfig::ALWAYS_TEST_ONLY)
3025 .memory()
3026 .unwrap();
3027
3028 assert_eq!(rec.ref_count(), 0);
3029
3030 let rows = example_rows(false);
3031 for row in rows.clone() {
3032 rec.record_row("a".into(), row, false);
3033 }
3034
3035 let mut msgs = {
3036 let mut msgs = storage.take();
3037 msgs.reverse();
3038 msgs
3039 };
3040
3041 // That's all.
3042 assert!(msgs.pop().is_none());
3043 }
3044
3045 #[test]
3046 fn test_set_thread_local() {
3047 // Regression-test for https://github.com/rerun-io/rerun/issues/2889
3048 std::thread::Builder::new()
3049 .name("test_thead".to_owned())
3050 .spawn(|| {
3051 let stream = RecordingStreamBuilder::new("rerun_example_test")
3052 .buffered()
3053 .unwrap();
3054 RecordingStream::set_thread_local(StoreKind::Recording, Some(stream));
3055 })
3056 .unwrap()
3057 .join()
3058 .unwrap();
3059 }
3060
3061 fn example_rows(static_: bool) -> Vec<PendingRow> {
3062 use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
3063 use re_sdk_types::Loggable;
3064
3065 let mut tick = 0i64;
3066 let mut timepoint = |frame_nr: i64| {
3067 let mut tp = TimePoint::default();
3068 if !static_ {
3069 tp.insert(Timeline::log_time(), re_log_types::Timestamp::now());
3070 tp.insert(Timeline::log_tick(), tick);
3071 tp.insert(Timeline::new_sequence("frame_nr"), frame_nr);
3072 }
3073 tick += 1;
3074 tp
3075 };
3076
3077 let row0 = {
3078 PendingRow {
3079 row_id: RowId::new(),
3080 timepoint: timepoint(1),
3081 components: [
3082 (
3083 MyPoints::descriptor_points().component,
3084 SerializedComponentBatch::new(
3085 <MyPoint as Loggable>::to_arrow([
3086 MyPoint::new(10.0, 10.0),
3087 MyPoint::new(20.0, 20.0),
3088 ])
3089 .unwrap(),
3090 MyPoints::descriptor_points(),
3091 ),
3092 ), //
3093 (
3094 MyPoints::descriptor_colors().component,
3095 SerializedComponentBatch::new(
3096 <MyColor as Loggable>::to_arrow([MyColor(0x8080_80FF)]).unwrap(),
3097 MyPoints::descriptor_colors(),
3098 ),
3099 ), //
3100 (
3101 MyPoints::descriptor_labels().component,
3102 SerializedComponentBatch::new(
3103 <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
3104 MyPoints::descriptor_labels(),
3105 ),
3106 ), //
3107 ]
3108 .into_iter()
3109 .collect(),
3110 }
3111 };
3112
3113 let row1 = {
3114 PendingRow {
3115 row_id: RowId::new(),
3116 timepoint: timepoint(1),
3117 components: [
3118 (
3119 MyPoints::descriptor_points().component,
3120 SerializedComponentBatch::new(
3121 <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
3122 MyPoints::descriptor_points(),
3123 ),
3124 ), //
3125 (
3126 MyPoints::descriptor_colors().component,
3127 SerializedComponentBatch::new(
3128 <MyColor as Loggable>::to_arrow([] as [MyColor; 0]).unwrap(),
3129 MyPoints::descriptor_colors(),
3130 ),
3131 ), //
3132 (
3133 MyPoints::descriptor_labels().component,
3134 SerializedComponentBatch::new(
3135 <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
3136 MyPoints::descriptor_labels(),
3137 ),
3138 ), //
3139 ]
3140 .into_iter()
3141 .collect(),
3142 }
3143 };
3144
3145 let row2 = {
3146 PendingRow {
3147 row_id: RowId::new(),
3148 timepoint: timepoint(1),
3149 components: [
3150 (
3151 MyPoints::descriptor_points().component,
3152 SerializedComponentBatch::new(
3153 <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
3154 MyPoints::descriptor_points(),
3155 ),
3156 ), //
3157 (
3158 MyPoints::descriptor_colors().component,
3159 SerializedComponentBatch::new(
3160 <MyColor as Loggable>::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(),
3161 MyPoints::descriptor_colors(),
3162 ),
3163 ), //
3164 (
3165 MyPoints::descriptor_labels().component,
3166 SerializedComponentBatch::new(
3167 <MyLabel as Loggable>::to_arrow([MyLabel("hey".into())]).unwrap(),
3168 MyPoints::descriptor_labels(),
3169 ),
3170 ), //
3171 ]
3172 .into_iter()
3173 .collect(),
3174 }
3175 };
3176
3177 vec![row0, row1, row2]
3178 }
3179
3180 // See <https://github.com/rerun-io/rerun/pull/8587> for context.
3181 #[test]
3182 fn allows_componentbatch_unsized() {
3183 let labels = [
3184 MyLabel("a".into()),
3185 MyLabel("b".into()),
3186 MyLabel("c".into()),
3187 ];
3188
3189 let (rec, _mem) = RecordingStreamBuilder::new("rerun_example_test_componentbatch_unsized")
3190 .default_enabled(false)
3191 .enabled(false)
3192 .memory()
3193 .unwrap();
3194
3195 // This call used to *not* compile due to a lack of `?Sized` bounds.
3196 use re_sdk_types::ComponentBatch as _;
3197 rec.log(
3198 "labels",
3199 &labels
3200 .try_serialized(MyPoints::descriptor_labels())
3201 .unwrap(),
3202 )
3203 .unwrap();
3204 }
3205
3206 struct BatcherConfigTestSink {
3207 config: ChunkBatcherConfig,
3208 }
3209
3210 impl LogSink for BatcherConfigTestSink {
3211 fn default_batcher_config(&self) -> ChunkBatcherConfig {
3212 self.config
3213 }
3214
3215 fn send(&self, _msg: LogMsg) {
3216 // noop
3217 }
3218
3219 fn flush_blocking(&self, _timeout: Duration) -> Result<(), SinkFlushError> {
3220 Ok(())
3221 }
3222 }
3223
3224 struct ScopedEnvVarSet {
3225 key: &'static str,
3226 }
3227
3228 impl ScopedEnvVarSet {
3229 #[expect(unsafe_code)]
3230 fn new(key: &'static str, value: &'static str) -> Self {
3231 // SAFETY: only used in tests.
3232 unsafe { std::env::set_var(key, value) };
3233 Self { key }
3234 }
3235 }
3236
3237 impl Drop for ScopedEnvVarSet {
3238 #[expect(unsafe_code)]
3239 fn drop(&mut self) {
3240 // SAFETY: only used in tests.
3241 unsafe {
3242 std::env::remove_var(self.key);
3243 }
3244 }
3245 }
3246
3247 const CONFIG_CHANGE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
3248
3249 fn clear_environment() {
3250 // SAFETY: only used in tests.
3251 #[expect(unsafe_code)]
3252 unsafe {
3253 std::env::remove_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED");
3254 std::env::remove_var("RERUN_FLUSH_NUM_BYTES");
3255 std::env::remove_var("RERUN_FLUSH_NUM_ROWS");
3256 std::env::remove_var("RERUN_FLUSH_TICK_SECS");
3257 std::env::remove_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED");
3258 }
3259 }
3260
3261 #[test]
3262 fn test_sink_dependent_batcher_config() {
3263 clear_environment();
3264
3265 let (tx, rx) = crossbeam::channel::bounded(16);
3266
3267 let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3268 .batcher_hooks(BatcherHooks {
3269 on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3270 re_quota_channel::send_crossbeam(&tx, *config).unwrap();
3271 })),
3272 ..BatcherHooks::NONE
3273 })
3274 .buffered()
3275 .unwrap();
3276
3277 let new_config = rx
3278 .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3279 .expect("no config change message received within timeout");
3280 assert_eq!(
3281 new_config,
3282 ChunkBatcherConfig::from_env().unwrap(),
3283 "Buffered sink should uses the config from the environment"
3284 );
3285
3286 // Change sink to our custom sink. Will it take over the setting?
3287 let injected_config = ChunkBatcherConfig {
3288 flush_tick: std::time::Duration::from_secs(123),
3289 flush_num_bytes: 123,
3290 flush_num_rows: 123,
3291 ..new_config
3292 };
3293 rec.set_sink(Box::new(BatcherConfigTestSink {
3294 config: injected_config,
3295 }));
3296 let new_config = rx
3297 .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3298 .expect("no config change message received within timeout");
3299
3300 assert_eq!(new_config, injected_config);
3301
3302 // Set flush num bytes through env var and set the sink again.
3303 // check that the env var is respected.
3304 let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_NUM_BYTES", "456");
3305 rec.set_sink(Box::new(BatcherConfigTestSink {
3306 config: injected_config,
3307 }));
3308 let new_config = rx
3309 .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3310 .expect("no config change message received within timeout");
3311 assert_eq!(
3312 new_config,
3313 ChunkBatcherConfig {
3314 flush_num_bytes: 456,
3315 ..injected_config
3316 },
3317 );
3318 }
3319
3320 #[test]
3321 fn test_explicit_batcher_config() {
3322 clear_environment();
3323
3324 // This environment variable should *not* override the explicit config.
3325 let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_TICK_SECS", "456");
3326 let explicit_config = ChunkBatcherConfig {
3327 flush_tick: std::time::Duration::from_secs(123),
3328 flush_num_bytes: 123,
3329 flush_num_rows: 123,
3330 ..ChunkBatcherConfig::DEFAULT
3331 };
3332
3333 let (tx, rx) = crossbeam::channel::bounded(16);
3334 let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3335 .batcher_config(explicit_config)
3336 .batcher_hooks(BatcherHooks {
3337 on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3338 re_quota_channel::send_crossbeam(&tx, *config).unwrap();
3339 })),
3340 ..BatcherHooks::NONE
3341 })
3342 .buffered()
3343 .unwrap();
3344
3345 let new_config = rx
3346 .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3347 .expect("no config change message received within timeout");
3348 assert_eq!(new_config, explicit_config);
3349
3350 // Changing the sink should have no effect since an explicit config is in place.
3351 rec.set_sink(Box::new(BatcherConfigTestSink {
3352 config: ChunkBatcherConfig::ALWAYS_TEST_ONLY,
3353 }));
3354 // Don't want to stall the test for CONFIG_CHANGE_TIMEOUT here.
3355 let new_config_recv_result = rx.recv_timeout(std::time::Duration::from_millis(100));
3356 assert_eq!(
3357 new_config_recv_result,
3358 Err(crossbeam::channel::RecvTimeoutError::Timeout)
3359 );
3360 }
3361}