re_sdk/recording_stream.rs
1use std::fmt;
2use std::io::IsTerminal as _;
3use std::sync::atomic::AtomicI64;
4use std::sync::{Arc, Weak};
5use std::time::Duration;
6
7use ahash::HashMap;
8use crossbeam::channel::{Receiver, RecvTimeoutError, Sender};
9use itertools::Either;
10use nohash_hasher::IntMap;
11use parking_lot::Mutex;
12use re_chunk::{
13 BatcherFlushError, BatcherHooks, Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError,
14 ChunkComponents, ChunkError, ChunkId, PendingRow, RowId, TimeColumn,
15};
16use re_log_types::{
17 ApplicationId, ArrowRecordBatchReleaseCallback, BlueprintActivationCommand, EntityPath, LogMsg,
18 RecordingId, StoreId, StoreInfo, StoreKind, StoreSource, TimeCell, TimeInt, TimePoint,
19 Timeline, TimelineName,
20};
21use re_sdk_types::archetypes::RecordingInfo;
22use re_sdk_types::components::Timestamp;
23use re_sdk_types::{AsComponents, SerializationError, SerializedComponentColumn};
24
25use crate::binary_stream_sink::BinaryStreamStorage;
26use crate::sink::{LogSink, MemorySinkStorage, SinkFlushError};
27
28// ---
29
30/// Private environment variable meant for tests.
31///
32/// When set, all recording streams will write to disk at the path indicated by the env-var rather
33/// than doing what they were asked to do - `connect_grpc()`, `buffered()`, even `save()` will re-use the same sink.
34const ENV_FORCE_SAVE: &str = "_RERUN_TEST_FORCE_SAVE";
35
36/// Returns path for force sink if private environment variable `_RERUN_TEST_FORCE_SAVE` is set
37///
38/// Newly created [`RecordingStream`]s should use a [`crate::sink::FileSink`] pointing to this path.
39/// Furthermore, [`RecordingStream::set_sink`] calls after this should not swap out to a new sink but re-use the existing one.
40/// Note that creating a new [`crate::sink::FileSink`] to the same file path (even temporarily) can cause
41/// a race between file creation (and thus clearing) and pending file writes.
42pub fn forced_sink_path() -> Option<String> {
43 std::env::var(ENV_FORCE_SAVE).ok()
44}
45
46/// Errors that can occur when creating/manipulating a [`RecordingStream`].
47#[derive(thiserror::Error, Debug)]
48pub enum RecordingStreamError {
49 /// Error within the underlying file sink.
50 #[error("Failed to create the underlying file sink: {0}")]
51 FileSink(#[from] re_log_encoding::FileSinkError),
52
53 /// Error within the underlying chunk batcher.
54 #[error("Failed to convert data to a valid chunk: {0}")]
55 Chunk(#[from] ChunkError),
56
57 /// Error within the underlying chunk batcher.
58 #[error("Failed to spawn the underlying batcher: {0}")]
59 ChunkBatcher(#[from] ChunkBatcherError),
60
61 /// Error within the underlying serializer.
62 #[error("Failed to serialize component data: {0}")]
63 Serialization(#[from] SerializationError),
64
65 /// Error spawning one of the background threads.
66 #[error("Failed to spawn background thread '{name}': {err}")]
67 SpawnThread {
68 /// Name of the thread
69 name: String,
70
71 /// Inner error explaining why the thread failed to spawn.
72 err: std::io::Error,
73 },
74
75 /// Error spawning a Rerun Viewer process.
76 #[error(transparent)] // makes bubbling all the way up to main look nice
77 SpawnViewer(#[from] crate::SpawnError),
78
79 /// Failure to host a web viewer and/or Rerun server.
80 #[cfg(feature = "web_viewer")]
81 #[error(transparent)]
82 WebSink(#[from] crate::web_viewer::WebViewerSinkError),
83
84 /// An error occurred while attempting to use a [`re_data_loader::DataLoader`].
85 #[cfg(feature = "data_loaders")]
86 #[error(transparent)]
87 DataLoaderError(#[from] re_data_loader::DataLoaderError),
88
89 /// Invalid gRPC server address.
90 #[error(transparent)]
91 UriError(#[from] re_uri::Error),
92
93 /// Invalid endpoint
94 #[error("not a `/proxy` endpoint")]
95 NotAProxyEndpoint,
96
97 /// Invalid bind IP.
98 #[error(transparent)]
99 InvalidAddress(#[from] std::net::AddrParseError),
100}
101
102/// Results that can occur when creating/manipulating a [`RecordingStream`].
103pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
104
105// ---
106
107/// Construct a [`RecordingStream`].
108///
109/// ``` no_run
110/// # use re_sdk::RecordingStreamBuilder;
111/// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
112/// # Ok::<(), Box<dyn std::error::Error>>(())
113/// ```
114///
115/// Automatically sends a [`Chunk`] with the default [`RecordingInfo`] to
116/// the sink, unless an explicit `recording_id` is set via [`RecordingStreamBuilder::recording_id`].
117#[derive(Debug)]
118pub struct RecordingStreamBuilder {
119 application_id: ApplicationId,
120 store_kind: StoreKind,
121 recording_id: Option<RecordingId>,
122 store_source: Option<StoreSource>,
123
124 default_enabled: bool,
125 enabled: Option<bool>,
126
127 batcher_hooks: BatcherHooks,
128 batcher_config: Option<ChunkBatcherConfig>,
129
130 // Optional user-defined recording properties.
131 should_send_properties: bool,
132 recording_info: RecordingInfo,
133}
134
135impl RecordingStreamBuilder {
136 /// Create a new [`RecordingStreamBuilder`] with the given [`ApplicationId`].
137 ///
138 /// The [`ApplicationId`] is usually the name of your app.
139 ///
140 /// ```no_run
141 /// # use re_sdk::RecordingStreamBuilder;
142 /// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
143 /// # Ok::<(), Box<dyn std::error::Error>>(())
144 /// ```
145 //
146 // NOTE: track_caller so that we can see if we are being called from an official example.
147 #[track_caller]
148 pub fn new(application_id: impl Into<ApplicationId>) -> Self {
149 let application_id = application_id.into();
150
151 Self {
152 application_id,
153 store_kind: StoreKind::Recording,
154 recording_id: None,
155 store_source: None,
156
157 default_enabled: true,
158 enabled: None,
159
160 batcher_config: None,
161 batcher_hooks: BatcherHooks::NONE,
162
163 should_send_properties: true,
164 recording_info: RecordingInfo::new()
165 .with_start_time(re_sdk_types::components::Timestamp::now()),
166 }
167 }
168
169 /// Create a new [`RecordingStreamBuilder`] with the given [`StoreId`].
170 //
171 // NOTE: track_caller so that we can see if we are being called from an official example.
172 #[track_caller]
173 pub fn from_store_id(store_id: &StoreId) -> Self {
174 Self {
175 application_id: store_id.application_id().clone(),
176 store_kind: store_id.kind(),
177 recording_id: Some(store_id.recording_id().clone()),
178 store_source: None,
179
180 default_enabled: true,
181 enabled: None,
182
183 batcher_config: None,
184 batcher_hooks: BatcherHooks::NONE,
185
186 should_send_properties: true,
187 recording_info: RecordingInfo::new()
188 .with_start_time(re_sdk_types::components::Timestamp::now()),
189 }
190 }
191
192 /// Set whether or not Rerun is enabled by default.
193 ///
194 /// If the `RERUN` environment variable is set, it will override this.
195 ///
196 /// Set also: [`Self::enabled`].
197 #[inline]
198 pub fn default_enabled(mut self, default_enabled: bool) -> Self {
199 self.default_enabled = default_enabled;
200 self
201 }
202
203 /// Set whether or not Rerun is enabled.
204 ///
205 /// Setting this will ignore the `RERUN` environment variable.
206 ///
207 /// Set also: [`Self::default_enabled`].
208 #[inline]
209 pub fn enabled(mut self, enabled: bool) -> Self {
210 self.enabled = Some(enabled);
211 self
212 }
213
214 /// Set the `RecordingId` for this context.
215 ///
216 /// If you're logging from multiple processes and want all the messages to end up in the same
217 /// recording, you must make sure that they all set the same `RecordingId` using this function.
218 ///
219 /// Note that many stores can share the same [`ApplicationId`], but they all have
220 /// unique `RecordingId`s.
221 ///
222 /// The default is to use a random `RecordingId`.
223 ///
224 /// When explicitly setting a `RecordingId`, the initial chunk that contains the recording
225 /// properties will not be sent.
226 #[inline]
227 pub fn recording_id(mut self, recording_id: impl Into<RecordingId>) -> Self {
228 self.recording_id = Some(recording_id.into());
229 self.send_properties(false)
230 }
231
232 /// Sets an optional name for the recording.
233 #[inline]
234 pub fn recording_name(mut self, name: impl Into<String>) -> Self {
235 self.recording_info = self.recording_info.with_name(name.into());
236 self
237 }
238
239 /// Sets an optional name for the recording.
240 #[inline]
241 pub fn recording_started(mut self, started: impl Into<Timestamp>) -> Self {
242 self.recording_info = self.recording_info.with_start_time(started);
243 self
244 }
245
246 /// Whether the [`RecordingInfo`] chunk should be sent.
247 #[inline]
248 pub fn send_properties(mut self, should_send: bool) -> Self {
249 self.should_send_properties = should_send;
250 self
251 }
252
253 /// Specifies the configuration of the internal data batching mechanism.
254 ///
255 /// If not set, the default configuration for the currently active sink will be used.
256 /// Any environment variables as specified on [`ChunkBatcherConfig`] will always override respective settings.
257 ///
258 /// See [`ChunkBatcher`] & [`ChunkBatcherConfig`] for more information.
259 #[inline]
260 pub fn batcher_config(mut self, config: ChunkBatcherConfig) -> Self {
261 self.batcher_config = Some(config);
262 self
263 }
264
265 /// Specifies callbacks for the batcher thread.
266 ///
267 /// See [`ChunkBatcher`] & [`BatcherHooks`] for more information.
268 #[inline]
269 pub fn batcher_hooks(mut self, hooks: BatcherHooks) -> Self {
270 self.batcher_hooks = hooks;
271 self
272 }
273
274 #[doc(hidden)]
275 #[inline]
276 pub fn store_source(mut self, store_source: StoreSource) -> Self {
277 self.store_source = Some(store_source);
278 self
279 }
280
281 #[doc(hidden)]
282 #[inline]
283 pub fn blueprint(mut self) -> Self {
284 self.store_kind = StoreKind::Blueprint;
285 self
286 }
287
288 /// Creates a new [`RecordingStream`] that starts in a buffering state (RAM).
289 ///
290 /// ## Example
291 ///
292 /// ```
293 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").buffered()?;
294 /// # Ok::<(), Box<dyn std::error::Error>>(())
295 /// ```
296 pub fn buffered(self) -> RecordingStreamResult<RecordingStream> {
297 let sink = crate::log_sink::BufferedSink::new();
298 let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
299 if enabled {
300 RecordingStream::new(
301 store_info,
302 properties,
303 batcher_config,
304 batcher_hooks,
305 Box::new(sink),
306 )
307 } else {
308 re_log::debug!("Rerun disabled - call to buffered() ignored");
309 Ok(RecordingStream::disabled())
310 }
311 }
312
313 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
314 /// [`crate::log_sink::MemorySink`].
315 ///
316 /// ## Example
317 ///
318 /// ```
319 /// # fn log_data(_: &re_sdk::RecordingStream) { }
320 ///
321 /// let (rec, storage) = re_sdk::RecordingStreamBuilder::new("rerun_example_app").memory()?;
322 ///
323 /// log_data(&rec);
324 ///
325 /// let data = storage.take();
326 ///
327 /// # Ok::<(), Box<dyn std::error::Error>>(())
328 /// ```
329 pub fn memory(
330 self,
331 ) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> {
332 let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
333 let rec = if enabled {
334 RecordingStream::new(
335 store_info,
336 properties,
337 batcher_config,
338 batcher_hooks,
339 Box::new(crate::log_sink::BufferedSink::new()),
340 )
341 } else {
342 re_log::debug!("Rerun disabled - call to memory() ignored");
343 Ok(RecordingStream::disabled())
344 }?;
345
346 let sink = crate::log_sink::MemorySink::new(rec.clone());
347 let storage = sink.buffer();
348 // Using set_sink here is necessary because the MemorySink needs to know
349 // it's own RecordingStream, which means we can't use `new` above.
350 // This has the downside of a bit of creation overhead and an extra StoreInfo
351 // message being sent to the sink.
352 // TODO(jleibs): Figure out a cleaner way to handle this.
353 rec.set_sink(Box::new(sink));
354 Ok((rec, storage))
355 }
356
357 /// Creates a new [`RecordingStream`] pre-configured to stream data to multiple sinks.
358 ///
359 /// Currently only supports [`GrpcSink`][grpc_sink] and [`FileSink`][file_sink].
360 ///
361 /// If the batcher configuration has not been set explicitly or by environment variables,
362 /// this will change the batcher configuration to a conservative (less often flushing) mix of
363 /// default configurations of the underlying sinks.
364 ///
365 /// [grpc_sink]: crate::sink::GrpcSink
366 /// [file_sink]: crate::sink::FileSink
367 pub fn set_sinks(
368 self,
369 sinks: impl crate::sink::IntoMultiSink,
370 ) -> RecordingStreamResult<RecordingStream> {
371 let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
372 if enabled {
373 RecordingStream::new(
374 store_info,
375 properties,
376 batcher_config,
377 batcher_hooks,
378 Box::new(sinks.into_multi_sink()),
379 )
380 } else {
381 re_log::debug!("Rerun disabled - call to set_sinks() ignored");
382 Ok(RecordingStream::disabled())
383 }
384 }
385
386 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
387 /// remote Rerun instance.
388 ///
389 /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
390 ///
391 /// ## Example
392 ///
393 /// ```no_run
394 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").connect_grpc()?;
395 /// # Ok::<(), Box<dyn std::error::Error>>(())
396 /// ```
397 pub fn connect_grpc(self) -> RecordingStreamResult<RecordingStream> {
398 self.connect_grpc_opts(format!(
399 "rerun+http://127.0.0.1:{}/proxy",
400 re_grpc_server::DEFAULT_SERVER_PORT
401 ))
402 }
403
404 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
405 /// remote Rerun instance.
406 ///
407 /// ## Example
408 ///
409 /// ```no_run
410 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
411 /// .connect_grpc_opts("rerun+http://127.0.0.1:9876/proxy")?;
412 /// # Ok::<(), Box<dyn std::error::Error>>(())
413 /// ```
414 pub fn connect_grpc_opts(
415 self,
416 url: impl Into<String>,
417 ) -> RecordingStreamResult<RecordingStream> {
418 let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
419 if enabled {
420 let url: String = url.into();
421 let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
422 return Err(RecordingStreamError::NotAProxyEndpoint);
423 };
424
425 RecordingStream::new(
426 store_info,
427 properties,
428 batcher_config,
429 batcher_hooks,
430 Box::new(crate::log_sink::GrpcSink::new(uri)),
431 )
432 } else {
433 re_log::debug!("Rerun disabled - call to connect() ignored");
434 Ok(RecordingStream::disabled())
435 }
436 }
437
438 #[cfg(feature = "server")]
439 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
440 /// locally hosted gRPC server.
441 ///
442 /// The server is hosted on the default IP and port, and may be connected to by any SDK or Viewer
443 /// at `rerun+http://127.0.0.1:9876/proxy` or by just running `rerun --connect`.
444 ///
445 /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
446 ///
447 /// The gRPC server will buffer in memory so that late connecting viewers will still get all the data.
448 /// You can limit the amount of data buffered by the gRPC server using [`Self::serve_grpc_opts`].
449 /// Once the memory limit is reached, the earliest logged data
450 /// will be dropped. Static data is never dropped.
451 ///
452 /// It is highly recommended that you use [`Self::serve_grpc_opts`] and set the memory limit to `0B`
453 /// if both the server and client are running on the same machine, otherwise you're potentially
454 /// doubling your memory usage!
455 pub fn serve_grpc(self) -> RecordingStreamResult<RecordingStream> {
456 use re_grpc_server::ServerOptions;
457
458 self.serve_grpc_opts(
459 "0.0.0.0",
460 crate::DEFAULT_SERVER_PORT,
461 ServerOptions {
462 memory_limit: re_memory::MemoryLimit::from_fraction_of_total(0.25),
463 ..Default::default()
464 },
465 )
466 }
467
468 #[cfg(feature = "server")]
469 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
470 /// locally hosted gRPC server.
471 ///
472 /// The server is hosted on the given `bind_ip` and `port`, may be connected to by any SDK or Viewer
473 /// at `rerun+http://{bind_ip}:{port}/proxy`.
474 ///
475 /// `0.0.0.0` is a good default for `bind_ip`.
476 ///
477 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
478 /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
479 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
480 ///
481 /// If server & client are running on the same machine and all clients are expected to connect before
482 /// any data is sent, it is highly recommended that you set the memory limit to `0B`,
483 /// otherwise you're potentially doubling your memory usage!
484 pub fn serve_grpc_opts(
485 self,
486 bind_ip: impl AsRef<str>,
487 port: u16,
488 server_options: re_grpc_server::ServerOptions,
489 ) -> RecordingStreamResult<RecordingStream> {
490 let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
491 if enabled {
492 RecordingStream::new(
493 store_info,
494 properties,
495 batcher_config,
496 batcher_hooks,
497 Box::new(crate::grpc_server::GrpcServerSink::new(
498 bind_ip.as_ref(),
499 port,
500 server_options,
501 )?),
502 )
503 } else {
504 re_log::debug!("Rerun disabled - call to serve_grpc() ignored");
505 Ok(RecordingStream::disabled())
506 }
507 }
508
509 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an
510 /// RRD file on disk.
511 ///
512 /// The Rerun Viewer is able to read continuously from the resulting rrd file while it is being written.
513 /// However, depending on your OS and configuration, changes may not be immediately visible due to file caching.
514 /// This is a common issue on Windows and (to a lesser extent) on `MacOS`.
515 ///
516 /// ## Example
517 ///
518 /// ```no_run
519 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
520 /// # Ok::<(), Box<dyn std::error::Error>>(())
521 /// ```
522 #[cfg(not(target_arch = "wasm32"))]
523 pub fn save(
524 self,
525 path: impl Into<std::path::PathBuf>,
526 ) -> RecordingStreamResult<RecordingStream> {
527 let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
528
529 if enabled {
530 RecordingStream::new(
531 store_info,
532 properties,
533 batcher_config,
534 batcher_hooks,
535 Box::new(crate::sink::FileSink::new(path)?),
536 )
537 } else {
538 re_log::debug!("Rerun disabled - call to save() ignored");
539 Ok(RecordingStream::disabled())
540 }
541 }
542
543 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to stdout.
544 ///
545 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
546 /// default back to `buffered` mode, in order not to break the user's terminal.
547 ///
548 /// ## Example
549 ///
550 /// ```no_run
551 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").stdout()?;
552 /// # Ok::<(), Box<dyn std::error::Error>>(())
553 /// ```
554 #[cfg(not(target_arch = "wasm32"))]
555 pub fn stdout(self) -> RecordingStreamResult<RecordingStream> {
556 if std::io::stdout().is_terminal() {
557 re_log::debug!("Ignored call to stdout() because stdout is a terminal");
558 return self.buffered();
559 }
560
561 let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
562
563 if enabled {
564 RecordingStream::new(
565 store_info,
566 properties,
567 batcher_config,
568 batcher_hooks,
569 Box::new(crate::sink::FileSink::stdout()?),
570 )
571 } else {
572 re_log::debug!("Rerun disabled - call to stdout() ignored");
573 Ok(RecordingStream::disabled())
574 }
575 }
576
577 /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
578 /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
579 ///
580 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
581 /// that viewer instead of starting a new one.
582 ///
583 /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
584 /// as well as the underlying connection.
585 ///
586 /// ## Example
587 ///
588 /// ```no_run
589 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").spawn()?;
590 /// # Ok::<(), Box<dyn std::error::Error>>(())
591 /// ```
592 pub fn spawn(self) -> RecordingStreamResult<RecordingStream> {
593 self.spawn_opts(&Default::default())
594 }
595
596 /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
597 /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
598 ///
599 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
600 /// that viewer instead of starting a new one.
601 ///
602 /// The behavior of the spawned Viewer can be configured via `opts`.
603 /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
604 ///
605 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
606 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
607 /// call to `flush` to block indefinitely if a connection cannot be established.
608 ///
609 /// ## Example
610 ///
611 /// ```no_run
612 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
613 /// .spawn_opts(&re_sdk::SpawnOptions::default())?;
614 /// # Ok::<(), Box<dyn std::error::Error>>(())
615 /// ```
616 pub fn spawn_opts(self, opts: &crate::SpawnOptions) -> RecordingStreamResult<RecordingStream> {
617 if !self.is_enabled() {
618 re_log::debug!("Rerun disabled - call to spawn() ignored");
619 return Ok(RecordingStream::disabled());
620 }
621
622 let url = format!("rerun+http://{}/proxy", opts.connect_addr());
623
624 // NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
625 // what, thus spawning a viewer is pointless (and probably not intended).
626 if forced_sink_path().is_some() {
627 return self.connect_grpc_opts(url);
628 }
629
630 crate::spawn(opts)?;
631
632 self.connect_grpc_opts(url)
633 }
634
635 /// Returns whether or not logging is enabled, a [`StoreInfo`] and the associated batcher
636 /// configuration.
637 ///
638 /// This can be used to then construct a [`RecordingStream`] manually using
639 /// [`RecordingStream::new`].
640 pub fn into_args(
641 self,
642 ) -> (
643 bool,
644 StoreInfo,
645 Option<RecordingInfo>,
646 Option<ChunkBatcherConfig>,
647 BatcherHooks,
648 ) {
649 let enabled = self.is_enabled();
650
651 let Self {
652 application_id,
653 store_kind,
654 recording_id,
655 store_source,
656 default_enabled: _,
657 enabled: _,
658 batcher_config,
659 batcher_hooks,
660 should_send_properties,
661 recording_info,
662 } = self;
663
664 let store_id = StoreId::new(
665 store_kind,
666 application_id,
667 recording_id.unwrap_or_else(RecordingId::random),
668 );
669 let store_source = store_source.unwrap_or_else(|| StoreSource::RustSdk {
670 rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
671 llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
672 });
673
674 let store_info = StoreInfo::new(store_id, store_source);
675
676 (
677 enabled,
678 store_info,
679 should_send_properties.then_some(recording_info),
680 batcher_config,
681 batcher_hooks,
682 )
683 }
684
685 /// Internal check for whether or not logging is enabled using explicit/default settings & env var.
686 fn is_enabled(&self) -> bool {
687 self.enabled
688 .unwrap_or_else(|| crate::decide_logging_enabled(self.default_enabled))
689 }
690}
691
692// ----------------------------------------------------------------------------
693
694/// A [`RecordingStream`] handles everything related to logging data into Rerun.
695///
696/// You can construct a new [`RecordingStream`] using [`RecordingStreamBuilder`] or
697/// [`RecordingStream::new`].
698///
699/// ## Sinks
700///
701/// Data is logged into Rerun via [`LogSink`]s.
702///
703/// The underlying [`LogSink`] of a [`RecordingStream`] can be changed at any point during its
704/// lifetime by calling [`RecordingStream::set_sink`] or one of the higher level helpers
705/// ([`RecordingStream::connect_grpc`], [`RecordingStream::memory`],
706/// [`RecordingStream::save`], [`RecordingStream::disconnect`]).
707///
708/// See [`RecordingStream::set_sink`] for more information.
709///
710/// ## Multithreading and ordering
711///
712/// [`RecordingStream`] can be cheaply cloned and used freely across any number of threads.
713///
714/// Internally, all operations are linearized into a pipeline:
715/// - All operations sent by a given thread will take effect in the same exact order as that
716/// thread originally sent them in, from its point of view.
717/// - There isn't any well defined global order across multiple threads.
718///
719/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all previous data sent by the calling thread
720/// has been recorded and (if applicable) flushed to the underlying OS-managed file descriptor,
721/// but other threads may still have data in flight.
722///
723/// ## Shutdown
724///
725/// The [`RecordingStream`] can only be shutdown by dropping all instances of it, at which point
726/// it will automatically take care of flushing any pending data that might remain in the pipeline.
727///
728/// Shutting down cannot ever block.
729#[derive(Clone)]
730pub struct RecordingStream {
731 inner: Either<Arc<RecordingStreamInner>, Weak<RecordingStreamInner>>,
732}
733
734impl RecordingStream {
735 /// Passes a reference to the [`RecordingStreamInner`], if it exists.
736 ///
737 /// This works whether the underlying stream is strong or weak.
738 #[inline]
739 fn with<F: FnOnce(&RecordingStreamInner) -> R, R>(&self, f: F) -> Option<R> {
740 match &self.inner {
741 Either::Left(strong) => Some(f(strong)),
742 Either::Right(weak) => Some(f(&*weak.upgrade()?)),
743 }
744 }
745
746 /// Clones the [`RecordingStream`] without incrementing the refcount.
747 ///
748 /// Useful e.g. if you want to make sure that a detached thread won't prevent the [`RecordingStream`]
749 /// from flushing during shutdown.
750 //
751 // TODO(#5335): shutdown flushing behavior is too brittle.
752 #[inline]
753 pub fn clone_weak(&self) -> Self {
754 Self {
755 inner: match &self.inner {
756 Either::Left(strong) => Either::Right(Arc::downgrade(strong)),
757 Either::Right(weak) => Either::Right(Weak::clone(weak)),
758 },
759 }
760 }
761
762 /// Returns the current reference count of the [`RecordingStream`].
763 ///
764 /// Returns 0 if the stream was created by [`RecordingStream::disabled()`],
765 /// or if it is a [`clone_weak()`][Self::clone_weak] of a stream whose strong instances
766 /// have all been dropped.
767 pub fn ref_count(&self) -> usize {
768 match &self.inner {
769 Either::Left(strong) => Arc::strong_count(strong),
770 Either::Right(weak) => weak.strong_count(),
771 }
772 }
773}
774
775// TODO(#5335): shutdown flushing behavior is too brittle.
776impl Drop for RecordingStream {
777 #[inline]
778 fn drop(&mut self) {
779 // If this holds the last strong handle to the recording, make sure that all pending
780 // `DataLoader` threads that were started from the SDK actually run to completion (they
781 // all hold a weak handle to this very recording!).
782 //
783 // NOTE: It's very important to do so from the `Drop` implementation of `RecordingStream`
784 // itself, because the dataloader threads -- by definition -- will have to send data into
785 // this very recording, therefore we must make sure that at least one strong handle still lives
786 // on until they are all finished.
787 if let Either::Left(strong) = &mut self.inner
788 && Arc::strong_count(strong) == 1
789 {
790 // Keep the recording alive until all dataloaders are finished.
791 self.with(|inner| inner.wait_for_dataloaders());
792 }
793 }
794}
795
796struct RecordingStreamInner {
797 store_info: StoreInfo,
798 recording_info: Option<RecordingInfo>,
799 tick: AtomicI64,
800
801 /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
802 /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
803 /// running.
804 cmds_tx: Sender<Command>,
805
806 batcher: ChunkBatcher,
807 batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
808
809 /// It true, any new sink will update the batcher's configuration (as far as possible).
810 sink_dependent_batcher_config: bool,
811
812 /// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader`
813 /// machinery in the context of this `RecordingStream`.
814 ///
815 /// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
816 dataloader_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,
817
818 pid_at_creation: u32,
819}
820
821impl fmt::Debug for RecordingStreamInner {
822 #[inline]
823 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
824 f.debug_struct("RecordingStreamInner")
825 .field("store_id", &self.store_info.store_id)
826 .finish()
827 }
828}
829
830impl Drop for RecordingStreamInner {
831 fn drop(&mut self) {
832 if self.is_forked_child() {
833 re_log::error_once!(
834 "Fork detected while dropping RecordingStreamInner. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
835 );
836 return;
837 }
838
839 self.wait_for_dataloaders();
840
841 // NOTE: The command channel is private, if we're here, nothing is currently capable of
842 // sending data down the pipeline.
843 let timeout = Duration::MAX;
844 if let Err(err) = self.batcher.flush_blocking(timeout) {
845 re_log::error!("Failed to flush batcher: {err}");
846 }
847 self.cmds_tx.send(Command::PopPendingChunks).ok();
848 self.cmds_tx.send(Command::Shutdown).ok();
849 if let Some(handle) = self.batcher_to_sink_handle.take() {
850 handle.join().ok();
851 }
852 }
853}
854
855fn resolve_batcher_config(
856 batcher_config: Option<ChunkBatcherConfig>,
857 sink: &dyn LogSink,
858) -> ChunkBatcherConfig {
859 if let Some(explicit_batcher_config) = batcher_config {
860 explicit_batcher_config
861 } else {
862 let default_config = sink.default_batcher_config();
863 default_config.apply_env().unwrap_or_else(|err| {
864 re_log::error!("Failed to parse ChunkBatcherConfig from env: {}", err);
865 default_config
866 })
867 }
868}
869
870impl RecordingStreamInner {
871 fn new(
872 store_info: StoreInfo,
873 recording_info: Option<RecordingInfo>,
874 batcher_config: Option<ChunkBatcherConfig>,
875 batcher_hooks: BatcherHooks,
876 sink: Box<dyn LogSink>,
877 ) -> RecordingStreamResult<Self> {
878 let sink_dependent_batcher_config = batcher_config.is_none();
879 let batcher_config = resolve_batcher_config(batcher_config, &*sink);
880
881 let on_release = batcher_hooks.on_release.clone();
882 let batcher = ChunkBatcher::new(batcher_config, batcher_hooks)?;
883
884 {
885 re_log::debug!(
886 store_id = ?store_info.store_id,
887 "Setting StoreInfo",
888 );
889 sink.send(
890 re_log_types::SetStoreInfo {
891 row_id: *RowId::new(),
892 info: store_info.clone(),
893 }
894 .into(),
895 );
896 }
897
898 let (cmds_tx, cmds_rx) = crossbeam::channel::unbounded();
899
900 let batcher_to_sink_handle = {
901 const NAME: &str = "RecordingStream::batcher_to_sink";
902 std::thread::Builder::new()
903 .name(NAME.into())
904 .spawn({
905 let info = store_info.clone();
906 let batcher = batcher.clone();
907 move || forwarding_thread(info, sink, cmds_rx, batcher.chunks(), on_release)
908 })
909 .map_err(|err| RecordingStreamError::SpawnThread {
910 name: NAME.into(),
911 err,
912 })?
913 };
914
915 if let Some(recording_info) = recording_info.as_ref() {
916 // We pre-populate the batcher with a chunk the contains the `RecordingInfo`
917 // so that these get automatically sent to the sink.
918
919 re_log::trace!(recording_info = ?recording_info, "Adding RecordingInfo to batcher");
920
921 let chunk = Chunk::builder(EntityPath::properties())
922 .with_archetype(RowId::new(), TimePoint::default(), recording_info)
923 .build()?;
924
925 batcher.push_chunk(chunk);
926 }
927
928 Ok(Self {
929 store_info,
930 recording_info,
931 tick: AtomicI64::new(0),
932 cmds_tx,
933 batcher,
934 batcher_to_sink_handle: Some(batcher_to_sink_handle),
935 sink_dependent_batcher_config,
936 dataloader_handles: Mutex::new(Vec::new()),
937 pid_at_creation: std::process::id(),
938 })
939 }
940
941 #[inline]
942 pub fn is_forked_child(&self) -> bool {
943 self.pid_at_creation != std::process::id()
944 }
945
946 /// Make sure all pending top-level `DataLoader` threads that were started from the SDK run to completion.
947 //
948 // TODO(cmc): At some point we might want to make it configurable, though I cannot really
949 // think of a use case where you'd want to drop those threads immediately upon
950 // disconnection.
951 fn wait_for_dataloaders(&self) {
952 let dataloader_handles = std::mem::take(&mut *self.dataloader_handles.lock());
953 for handle in dataloader_handles {
954 handle.join().ok();
955 }
956 }
957}
958
959type InspectSinkFn = Box<dyn FnOnce(&dyn LogSink) + Send + 'static>;
960
961type FlushResult = Result<(), SinkFlushError>;
962
963enum Command {
964 RecordMsg(LogMsg),
965 SwapSink {
966 new_sink: Box<dyn LogSink>,
967 timeout: Duration,
968 },
969 // TODO(#10444): This should go away with more explicit sinks.
970 InspectSink(InspectSinkFn),
971 Flush {
972 on_done: Sender<FlushResult>,
973 timeout: Duration,
974 },
975 PopPendingChunks,
976 Shutdown,
977}
978
979impl Command {
980 fn flush(timeout: Duration) -> (Self, Receiver<FlushResult>) {
981 let (on_done, rx) = crossbeam::channel::bounded(1); // oneshot
982 (Self::Flush { on_done, timeout }, rx)
983 }
984}
985
986impl RecordingStream {
987 /// Creates a new [`RecordingStream`] with a given [`StoreInfo`] and [`LogSink`].
988 ///
989 /// You can create a [`StoreInfo`] with [`crate::new_store_info`];
990 ///
991 /// The [`StoreInfo`] is immediately sent to the sink in the form of a
992 /// [`re_log_types::SetStoreInfo`].
993 ///
994 /// You can find sinks in [`crate::sink`].
995 ///
996 /// If no batcher configuration is provided, the default batcher configuration for the sink will be used.
997 /// Any environment variables as specified in [`ChunkBatcherConfig`] will always override respective settings.
998 ///
999 /// See also: [`RecordingStreamBuilder`].
1000 #[must_use = "Recording will get closed automatically once all instances of this object have been dropped"]
1001 pub fn new(
1002 store_info: StoreInfo,
1003 recording_info: Option<RecordingInfo>,
1004 batcher_config: Option<ChunkBatcherConfig>,
1005 batcher_hooks: BatcherHooks,
1006 sink: Box<dyn LogSink>,
1007 ) -> RecordingStreamResult<Self> {
1008 let sink = store_info
1009 .store_id
1010 .is_recording()
1011 .then(forced_sink_path)
1012 .flatten()
1013 .map_or(sink, |path| {
1014 re_log::info!("Forcing FileSink because of env-var {ENV_FORCE_SAVE}={path:?}");
1015 Box::new(
1016 crate::sink::FileSink::new(path)
1017 .expect("Failed to create FileSink for forced test path"),
1018 ) as Box<dyn LogSink>
1019 });
1020
1021 let stream = RecordingStreamInner::new(
1022 store_info,
1023 recording_info,
1024 batcher_config,
1025 batcher_hooks,
1026 sink,
1027 )
1028 .map(|inner| Self {
1029 inner: Either::Left(Arc::new(inner)),
1030 })?;
1031
1032 Ok(stream)
1033 }
1034
1035 /// Creates a new no-op [`RecordingStream`] that drops all logging messages, doesn't allocate
1036 /// any memory and doesn't spawn any threads.
1037 ///
1038 /// [`Self::is_enabled`] will return `false`.
1039 pub const fn disabled() -> Self {
1040 Self {
1041 inner: Either::Right(Weak::new()),
1042 }
1043 }
1044}
1045
1046impl RecordingStream {
1047 /// Log data to Rerun.
1048 ///
1049 /// This is the main entry point for logging data to rerun. It can be used to log anything
1050 /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1051 /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1052 ///
1053 /// The data will be timestamped automatically based on the [`RecordingStream`]'s internal clock.
1054 /// See [`RecordingStream::set_time_sequence`] etc for more information.
1055 ///
1056 /// The entity path can either be a string
1057 /// (with special characters escaped, split on unescaped slashes)
1058 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1059 /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1060 ///
1061 /// See also: [`Self::log_static`] for logging static data.
1062 ///
1063 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1064 /// transport.
1065 /// See [SDK Micro Batching] for more information.
1066 ///
1067 /// # Example:
1068 /// ```ignore
1069 /// # use rerun;
1070 /// # let (rec, storage) = rerun::RecordingStreamBuilder::new("rerun_example_points3d_simple").memory()?;
1071 /// rec.log(
1072 /// "my/points",
1073 /// &rerun::Points3D::new([(0.0, 0.0, 0.0), (1.0, 1.0, 1.0)]),
1074 /// )?;
1075 /// # Ok::<(), Box<dyn std::error::Error>>(())
1076 /// ```
1077 ///
1078 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1079 /// [component bundle]: [`AsComponents`]
1080 #[inline]
1081 pub fn log<AS: ?Sized + AsComponents>(
1082 &self,
1083 ent_path: impl Into<EntityPath>,
1084 as_components: &AS,
1085 ) -> RecordingStreamResult<()> {
1086 self.log_with_static(ent_path, false, as_components)
1087 }
1088
1089 /// Lower-level logging API to provide data spanning multiple timepoints.
1090 ///
1091 /// Unlike the regular `log` API, which is row-oriented, this API lets you submit the data
1092 /// in a columnar form. The lengths of all of the [`TimeColumn`] and the component columns
1093 /// must match. All data that occurs at the same index across the different index/time and components
1094 /// arrays will act as a single logical row.
1095 ///
1096 /// Note that this API ignores any stateful index/time set on the log stream via the
1097 /// [`Self::set_time`]/[`Self::set_timepoint`]/etc. APIs.
1098 /// Furthermore, this will _not_ inject the default timelines `log_tick` and `log_time` timeline columns.
1099 pub fn send_columns(
1100 &self,
1101 ent_path: impl Into<EntityPath>,
1102 indexes: impl IntoIterator<Item = TimeColumn>,
1103 columns: impl IntoIterator<Item = SerializedComponentColumn>,
1104 ) -> RecordingStreamResult<()> {
1105 let id = ChunkId::new();
1106
1107 let indexes = indexes
1108 .into_iter()
1109 .map(|col| (*col.timeline().name(), col))
1110 .collect();
1111
1112 let components: ChunkComponents = columns
1113 .into_iter()
1114 .map(|column| (column.descriptor, column.list_array))
1115 .collect();
1116
1117 let chunk = Chunk::from_auto_row_ids(id, ent_path.into(), indexes, components)?;
1118
1119 self.send_chunk(chunk);
1120
1121 Ok(())
1122 }
1123
1124 /// Log data to Rerun.
1125 ///
1126 /// It can be used to log anything
1127 /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1128 /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1129 ///
1130 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1131 /// any temporal data of the same type.
1132 /// All timestamp data associated with this message will be dropped right before sending it to Rerun.
1133 ///
1134 /// This is most often used for [`rerun::ViewCoordinates`](https://docs.rs/rerun/latest/rerun/archetypes/struct.ViewCoordinates.html) and
1135 /// [`rerun::AnnotationContext`](https://docs.rs/rerun/latest/rerun/archetypes/struct.AnnotationContext.html).
1136 ///
1137 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1138 /// transport.
1139 /// See [SDK Micro Batching] for more information.
1140 ///
1141 /// See also [`Self::log`].
1142 ///
1143 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1144 /// [component bundle]: [`AsComponents`]
1145 #[inline]
1146 pub fn log_static<AS: ?Sized + AsComponents>(
1147 &self,
1148 ent_path: impl Into<EntityPath>,
1149 as_components: &AS,
1150 ) -> RecordingStreamResult<()> {
1151 self.log_with_static(ent_path, true, as_components)
1152 }
1153
1154 /// Logs the contents of a [component bundle] into Rerun.
1155 ///
1156 /// If `static_` is set to `true`, all timestamp data associated with this message will be
1157 /// dropped right before sending it to Rerun.
1158 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1159 /// any temporal data of the same type.
1160 ///
1161 /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1162 /// internal clock.
1163 /// See `RecordingStream::set_time_*` family of methods for more information.
1164 ///
1165 /// The entity path can either be a string
1166 /// (with special characters escaped, split on unescaped slashes)
1167 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1168 /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1169 ///
1170 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1171 /// transport.
1172 /// See [SDK Micro Batching] for more information.
1173 ///
1174 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1175 /// [component bundle]: [`AsComponents`]
1176 #[inline]
1177 pub fn log_with_static<AS: ?Sized + AsComponents>(
1178 &self,
1179 ent_path: impl Into<EntityPath>,
1180 static_: bool,
1181 as_components: &AS,
1182 ) -> RecordingStreamResult<()> {
1183 let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1184 self.log_serialized_batches_impl(
1185 row_id,
1186 ent_path,
1187 static_,
1188 as_components.as_serialized_batches(),
1189 )
1190 }
1191
1192 /// Logs a set of [`SerializedComponentBatch`]es into Rerun.
1193 ///
1194 /// If `static_` is set to `true`, all timestamp data associated with this message will be
1195 /// dropped right before sending it to Rerun.
1196 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1197 /// any temporal data of the same type.
1198 ///
1199 /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1200 /// internal clock.
1201 /// See `RecordingStream::set_time_*` family of methods for more information.
1202 ///
1203 /// The number of instances will be determined by the longest batch in the bundle.
1204 ///
1205 /// The entity path can either be a string
1206 /// (with special characters escaped, split on unescaped slashes)
1207 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1208 /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1209 ///
1210 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1211 /// transport.
1212 /// See [SDK Micro Batching] for more information.
1213 ///
1214 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1215 ///
1216 /// [`SerializedComponentBatch`]: [re_types_core::SerializedComponentBatch]
1217 pub fn log_serialized_batches(
1218 &self,
1219 ent_path: impl Into<EntityPath>,
1220 static_: bool,
1221 comp_batches: impl IntoIterator<Item = re_sdk_types::SerializedComponentBatch>,
1222 ) -> RecordingStreamResult<()> {
1223 let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1224 self.log_serialized_batches_impl(row_id, ent_path, static_, comp_batches)
1225 }
1226
1227 /// Sends a property to the recording.
1228 #[inline]
1229 pub fn send_property<AS: ?Sized + AsComponents>(
1230 &self,
1231 name: impl Into<String>,
1232 values: &AS,
1233 ) -> RecordingStreamResult<()> {
1234 let sub_path = EntityPath::from(name.into());
1235 self.log_static(EntityPath::properties().join(&sub_path), values)
1236 }
1237
1238 /// Sends the name of the recording.
1239 #[inline]
1240 pub fn send_recording_name(&self, name: impl Into<String>) -> RecordingStreamResult<()> {
1241 let update = RecordingInfo::update_fields().with_name(name.into());
1242 self.log_static(EntityPath::properties(), &update)
1243 }
1244
1245 /// Sends the start time of the recording.
1246 #[inline]
1247 pub fn send_recording_start_time(
1248 &self,
1249 timestamp: impl Into<Timestamp>,
1250 ) -> RecordingStreamResult<()> {
1251 let update = RecordingInfo::update_fields().with_start_time(timestamp.into());
1252 self.log_static(EntityPath::properties(), &update)
1253 }
1254
1255 // NOTE: For bw and fw compatibility reasons, we need our logging APIs to be fallible, even
1256 // though they really aren't at the moment.
1257 #[expect(clippy::unnecessary_wraps)]
1258 fn log_serialized_batches_impl(
1259 &self,
1260 row_id: RowId,
1261 entity_path: impl Into<EntityPath>,
1262 static_: bool,
1263 comp_batches: impl IntoIterator<Item = re_sdk_types::SerializedComponentBatch>,
1264 ) -> RecordingStreamResult<()> {
1265 if !self.is_enabled() {
1266 return Ok(()); // silently drop the message
1267 }
1268
1269 let entity_path = entity_path.into();
1270
1271 let components: IntMap<_, _> = comp_batches
1272 .into_iter()
1273 .map(|comp_batch| (comp_batch.descriptor.component, comp_batch))
1274 .collect();
1275
1276 // NOTE: The timepoint is irrelevant, the `RecordingStream` will overwrite it using its
1277 // internal clock.
1278 let timepoint = TimePoint::default();
1279
1280 if !components.is_empty() {
1281 let row = PendingRow {
1282 row_id,
1283 timepoint,
1284 components,
1285 };
1286 self.record_row(entity_path, row, !static_);
1287 }
1288
1289 Ok(())
1290 }
1291
1292 /// Logs the file at the given `path` using all [`re_data_loader::DataLoader`]s available.
1293 ///
1294 /// A single `path` might be handled by more than one loader.
1295 ///
1296 /// This method blocks until either at least one [`re_data_loader::DataLoader`] starts
1297 /// streaming data in or all of them fail.
1298 ///
1299 /// See <https://www.rerun.io/docs/reference/data-loaders/overview> for more information.
1300 #[cfg(feature = "data_loaders")]
1301 pub fn log_file_from_path(
1302 &self,
1303 filepath: impl AsRef<std::path::Path>,
1304 entity_path_prefix: Option<EntityPath>,
1305 static_: bool,
1306 ) -> RecordingStreamResult<()> {
1307 self.log_file(filepath, None, entity_path_prefix, static_, true)
1308 }
1309
1310 /// Logs the given `contents` using all [`re_data_loader::DataLoader`]s available.
1311 ///
1312 /// A single `path` might be handled by more than one loader.
1313 ///
1314 /// This method blocks until either at least one [`re_data_loader::DataLoader`] starts
1315 /// streaming data in or all of them fail.
1316 ///
1317 /// See <https://www.rerun.io/docs/reference/data-loaders/overview> for more information.
1318 #[cfg(feature = "data_loaders")]
1319 pub fn log_file_from_contents(
1320 &self,
1321 filepath: impl AsRef<std::path::Path>,
1322 contents: std::borrow::Cow<'_, [u8]>,
1323 entity_path_prefix: Option<EntityPath>,
1324 static_: bool,
1325 ) -> RecordingStreamResult<()> {
1326 self.log_file(filepath, Some(contents), entity_path_prefix, static_, true)
1327 }
1328
1329 /// If `prefer_current_recording` is set (which is always the case for now), the dataloader settings
1330 /// will be configured as if the current SDK recording is the currently opened recording.
1331 /// Most dataloaders prefer logging to the currently opened recording if one is set.
1332 #[cfg(feature = "data_loaders")]
1333 fn log_file(
1334 &self,
1335 filepath: impl AsRef<std::path::Path>,
1336 contents: Option<std::borrow::Cow<'_, [u8]>>,
1337 entity_path_prefix: Option<EntityPath>,
1338 static_: bool,
1339 prefer_current_recording: bool,
1340 ) -> RecordingStreamResult<()> {
1341 let Some(store_info) = self.store_info().clone() else {
1342 re_log::warn!(
1343 "Ignored call to log_file() because RecordingStream has not been properly initialized"
1344 );
1345 return Ok(());
1346 };
1347
1348 let filepath = filepath.as_ref();
1349 let has_contents = contents.is_some();
1350
1351 let (tx, rx) =
1352 re_log_channel::log_channel(re_log_channel::LogSource::File(filepath.into()));
1353
1354 let mut settings = crate::DataLoaderSettings {
1355 application_id: Some(store_info.application_id().clone()),
1356 recording_id: store_info.recording_id().clone(),
1357 opened_store_id: None,
1358 force_store_info: false,
1359 entity_path_prefix,
1360 timepoint: (!static_).then(|| {
1361 self.with(|inner| {
1362 // Get the current time on all timelines, for the current recording, on the current
1363 // thread…
1364 let mut now = self.now();
1365
1366 // …and then also inject the current recording tick into it.
1367 let tick = inner
1368 .tick
1369 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1370 now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1371
1372 now
1373 })
1374 .unwrap_or_default()
1375 }),
1376 };
1377
1378 if prefer_current_recording {
1379 settings.opened_store_id = Some(store_info.store_id);
1380 }
1381
1382 if let Some(contents) = contents {
1383 re_data_loader::load_from_file_contents(
1384 &settings,
1385 re_log_types::FileSource::Sdk,
1386 filepath,
1387 contents,
1388 &tx,
1389 )?;
1390 } else {
1391 re_data_loader::load_from_path(
1392 &settings,
1393 re_log_types::FileSource::Sdk,
1394 filepath,
1395 &tx,
1396 )?;
1397 }
1398 drop(tx);
1399
1400 // We can safely ignore the error on `recv()` as we're in complete control of both ends of
1401 // the channel.
1402 let thread_name = if has_contents {
1403 format!("log_file_from_contents({filepath:?})")
1404 } else {
1405 format!("log_file_from_path({filepath:?})")
1406 };
1407 let handle = std::thread::Builder::new()
1408 .name(thread_name.clone())
1409 .spawn({
1410 let this = self.clone_weak();
1411 move || {
1412 while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) {
1413 match msg {
1414 re_log_channel::DataSourceMessage::LogMsg(log_msg) => {
1415 this.record_msg(log_msg);
1416 }
1417 unsupported => {
1418 re_log::error_once!(
1419 "Ignoring unexpected {} in file",
1420 unsupported.variant_name()
1421 );
1422 }
1423 }
1424 }
1425 }
1426 })
1427 .map_err(|err| RecordingStreamError::SpawnThread {
1428 name: thread_name,
1429 err,
1430 })?;
1431
1432 self.with(|inner| inner.dataloader_handles.lock().push(handle));
1433
1434 Ok(())
1435 }
1436}
1437
1438#[expect(clippy::needless_pass_by_value)]
1439fn forwarding_thread(
1440 store_info: StoreInfo,
1441 mut sink: Box<dyn LogSink>,
1442 cmds_rx: Receiver<Command>,
1443 chunks: Receiver<Chunk>,
1444 on_release: Option<ArrowRecordBatchReleaseCallback>,
1445) {
1446 /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate
1447 /// shutdown.
1448 fn handle_cmd(store_info: &StoreInfo, cmd: Command, sink: &mut Box<dyn LogSink>) -> bool {
1449 match cmd {
1450 Command::RecordMsg(msg) => {
1451 sink.send(msg);
1452 }
1453 Command::SwapSink { new_sink, timeout } => {
1454 re_log::trace!("Swapping sink…");
1455
1456 let backlog = {
1457 // Capture the backlog if it exists.
1458 let backlog = sink.drain_backlog();
1459
1460 // Flush the underlying sink if possible.
1461 if let Err(err) = sink.flush_blocking(timeout) {
1462 re_log::error!("Failed to flush previous sink: {err}");
1463 }
1464
1465 backlog
1466 };
1467
1468 // Send the recording info to the new sink. This is idempotent.
1469 {
1470 re_log::debug!(
1471 store_id = ?store_info.store_id,
1472 "Setting StoreInfo",
1473 );
1474 new_sink.send(
1475 re_log_types::SetStoreInfo {
1476 row_id: *RowId::new(),
1477 info: store_info.clone(),
1478 }
1479 .into(),
1480 );
1481 new_sink.send_all(backlog);
1482 }
1483
1484 *sink = new_sink;
1485 }
1486 Command::InspectSink(f) => {
1487 f(sink.as_ref());
1488 }
1489 Command::Flush { on_done, timeout } => {
1490 re_log::trace!("Flushing…");
1491
1492 let result = sink.flush_blocking(timeout);
1493
1494 // Send back the result:
1495 if let Err(crossbeam::channel::SendError(result)) = on_done.send(result)
1496 && let Err(err) = result
1497 {
1498 // There was an error, and nobody received it:
1499 re_log::error!("Failed to flush sink: {err}");
1500 }
1501 }
1502 Command::PopPendingChunks => {
1503 // Wake up and skip the current iteration so that we can drain all pending chunks
1504 // before handling the next command.
1505 }
1506 Command::Shutdown => return false,
1507 }
1508
1509 true
1510 }
1511
1512 use crossbeam::select;
1513 loop {
1514 // NOTE: Always pop chunks first, this is what makes `Command::PopPendingChunks` possible,
1515 // which in turns makes `RecordingStream::flush_blocking` well defined.
1516 while let Ok(chunk) = chunks.try_recv() {
1517 let mut msg = match chunk.to_arrow_msg() {
1518 Ok(chunk) => chunk,
1519 Err(err) => {
1520 re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1521 continue;
1522 }
1523 };
1524 msg.on_release = on_release.clone();
1525 sink.send(LogMsg::ArrowMsg(store_info.store_id.clone(), msg));
1526 }
1527
1528 select! {
1529 recv(chunks) -> res => {
1530 let Ok(chunk) = res else {
1531 // The batcher is gone, which can only happen if the `RecordingStream` itself
1532 // has been dropped.
1533 re_log::trace!("Shutting down forwarding_thread: batcher is gone");
1534 break;
1535 };
1536
1537 let msg = match chunk.to_arrow_msg() {
1538 Ok(chunk) => chunk,
1539 Err(err) => {
1540 re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1541 continue;
1542 }
1543 };
1544
1545 sink.send(LogMsg::ArrowMsg(store_info.store_id.clone(), msg));
1546 }
1547
1548 recv(cmds_rx) -> res => {
1549 let Ok(cmd) = res else {
1550 // All command senders are gone, which can only happen if the
1551 // `RecordingStream` itself has been dropped.
1552 re_log::trace!("Shutting down forwarding_thread: all command senders are gone");
1553 break;
1554 };
1555 if !handle_cmd(&store_info, cmd, &mut sink) {
1556 break; // shutdown
1557 }
1558 }
1559 }
1560
1561 // NOTE: The receiving end of the command stream is owned solely by this thread.
1562 // Past this point, all command writes will return `ErrDisconnected`.
1563 }
1564}
1565
1566impl RecordingStream {
1567 /// Check if logging is enabled on this `RecordingStream`.
1568 ///
1569 /// If not, all recording calls will be ignored.
1570 #[inline]
1571 pub fn is_enabled(&self) -> bool {
1572 self.with(|_| true).unwrap_or(false)
1573 }
1574
1575 /// The [`StoreInfo`] associated with this `RecordingStream`.
1576 #[inline]
1577 pub fn store_info(&self) -> Option<StoreInfo> {
1578 self.with(|inner| inner.store_info.clone())
1579 }
1580
1581 /// Determine whether a fork has happened since creating this `RecordingStream`. In general, this means our
1582 /// batcher/sink threads are gone and all data logged since the fork has been dropped.
1583 ///
1584 /// It is essential that [`crate::cleanup_if_forked_child`] be called after forking the process. SDK-implementations
1585 /// should do this during their initialization phase.
1586 #[inline]
1587 pub fn is_forked_child(&self) -> bool {
1588 self.with(|inner| inner.is_forked_child()).unwrap_or(false)
1589 }
1590}
1591
1592impl RecordingStream {
1593 /// Records an arbitrary [`LogMsg`].
1594 #[inline]
1595 pub fn record_msg(&self, msg: LogMsg) {
1596 let f = move |inner: &RecordingStreamInner| {
1597 // NOTE: Internal channels can never be closed outside of the `Drop` impl, this send cannot
1598 // fail.
1599 inner.cmds_tx.send(Command::RecordMsg(msg)).ok();
1600 inner
1601 .tick
1602 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1603 };
1604
1605 if self.with(f).is_none() {
1606 re_log::warn_once!("Recording disabled - call to record_msg() ignored");
1607 }
1608 }
1609
1610 /// Records a single [`PendingRow`].
1611 ///
1612 /// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
1613 /// [`RecordingStream`]'s internal clock.
1614 ///
1615 /// Internally, incoming [`PendingRow`]s are automatically coalesced into larger [`Chunk`]s to
1616 /// optimize for transport.
1617 #[inline]
1618 pub fn record_row(&self, entity_path: EntityPath, mut row: PendingRow, inject_time: bool) {
1619 let f = move |inner: &RecordingStreamInner| {
1620 // NOTE: We're incrementing the current tick still.
1621 let tick = inner
1622 .tick
1623 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1624 if inject_time {
1625 // Get the current time on all timelines, for the current recording, on the current
1626 // thread…
1627 let mut now = self.now();
1628 // …and then also inject the current recording tick into it.
1629 now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1630
1631 // Inject all these times into the row, overriding conflicting times, if any.
1632 for (timeline, cell) in now {
1633 row.timepoint.insert_cell(timeline, cell);
1634 }
1635 }
1636
1637 inner.batcher.push_row(entity_path, row);
1638 };
1639
1640 if self.with(f).is_none() {
1641 re_log::warn_once!("Recording disabled - call to record_row() ignored");
1642 }
1643 }
1644
1645 /// Logs a single [`Chunk`].
1646 ///
1647 /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1648 /// If you don't want to inject these, use [`Self::send_chunk`] instead.
1649 #[inline]
1650 pub fn log_chunk(&self, mut chunk: Chunk) {
1651 let f = move |inner: &RecordingStreamInner| {
1652 // TODO(cmc): Repeating these values is pretty wasteful. Would be nice to have a way of
1653 // indicating these are fixed across the whole chunk.
1654 // Inject the log time
1655 {
1656 let time_timeline = Timeline::log_time();
1657 let time =
1658 TimeInt::new_temporal(re_log_types::Timestamp::now().nanos_since_epoch());
1659
1660 let repeated_time = std::iter::repeat_n(time.as_i64(), chunk.num_rows()).collect();
1661
1662 let time_column = TimeColumn::new(Some(true), time_timeline, repeated_time);
1663
1664 if let Err(err) = chunk.add_timeline(time_column) {
1665 re_log::error!(
1666 "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1667 time_timeline.name(),
1668 err
1669 );
1670 return;
1671 }
1672 }
1673 // Inject the log tick
1674 {
1675 let tick_timeline = Timeline::log_tick();
1676
1677 let tick = inner
1678 .tick
1679 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1680
1681 let repeated_tick = std::iter::repeat_n(tick, chunk.num_rows()).collect();
1682
1683 let tick_chunk = TimeColumn::new(Some(true), tick_timeline, repeated_tick);
1684
1685 if let Err(err) = chunk.add_timeline(tick_chunk) {
1686 re_log::error!(
1687 "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1688 tick_timeline.name(),
1689 err
1690 );
1691 return;
1692 }
1693 }
1694
1695 inner.batcher.push_chunk(chunk);
1696 };
1697
1698 if self.with(f).is_none() {
1699 re_log::warn_once!("Recording disabled - call to log_chunk() ignored");
1700 }
1701 }
1702
1703 /// Logs multiple [`Chunk`]s.
1704 ///
1705 /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1706 /// for that use [`Self::log_chunks`].
1707 pub fn log_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1708 for chunk in chunks {
1709 self.log_chunk(chunk);
1710 }
1711 }
1712
1713 /// Records a single [`Chunk`].
1714 ///
1715 /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1716 /// If you don't want to inject these, use [`Self::send_chunks`] instead.
1717 #[inline]
1718 pub fn send_chunk(&self, chunk: Chunk) {
1719 let f = move |inner: &RecordingStreamInner| {
1720 inner.batcher.push_chunk(chunk);
1721 };
1722
1723 if self.with(f).is_none() {
1724 re_log::warn_once!("Recording disabled - call to send_chunk() ignored");
1725 }
1726 }
1727
1728 /// Records multiple [`Chunk`]s.
1729 ///
1730 /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1731 /// for that use [`Self::log_chunks`].
1732 pub fn send_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1733 for chunk in chunks {
1734 self.send_chunk(chunk);
1735 }
1736 }
1737
1738 /// Swaps the underlying sink for a new one.
1739 ///
1740 /// This guarantees that:
1741 /// 1. all pending rows and chunks are batched, collected and sent down the current sink,
1742 /// 2. the current sink is flushed if it has pending data in its buffers,
1743 /// 3. the current sink's backlog, if there's any, is forwarded to the new sink.
1744 ///
1745 /// When this function returns, the calling thread is guaranteed that all future record calls
1746 /// will end up in the new sink.
1747 ///
1748 /// If the batcher's configuration has not been set explicitly or by environment variables,
1749 /// this will change the batcher configuration to the sink's default configuration.
1750 ///
1751 /// ## Data loss
1752 ///
1753 /// If the current sink is in a broken state (e.g. a gRPC sink with a broken connection that
1754 /// cannot be repaired), all pending data in its buffers will be dropped.
1755 pub fn set_sink(&self, new_sink: Box<dyn LogSink>) {
1756 if self.is_forked_child() {
1757 re_log::error_once!(
1758 "Fork detected during set_sink. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1759 );
1760 return;
1761 }
1762
1763 let timeout = Duration::MAX; // The background thread should block forever if necessary
1764
1765 let f = move |inner: &RecordingStreamInner| {
1766 // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1767 // are safe.
1768
1769 // Flush the batcher down the chunk channel
1770 if let Err(err) = inner.batcher.flush_blocking(timeout) {
1771 re_log::warn!("Failed to flush batcher in `set_sink`: {err}");
1772 }
1773
1774 // Receive pending chunks from the batcher's channel
1775 inner.cmds_tx.send(Command::PopPendingChunks).ok();
1776
1777 // Update the batcher's configuration if it's sink-dependent.
1778 if inner.sink_dependent_batcher_config {
1779 let batcher_config = resolve_batcher_config(None, &*new_sink);
1780 inner.batcher.update_config(batcher_config);
1781 }
1782
1783 // Swap the sink, which will internally make sure to re-ingest the backlog if needed
1784 inner
1785 .cmds_tx
1786 .send(Command::SwapSink { new_sink, timeout })
1787 .ok();
1788
1789 // Before we give control back to the caller, we need to make sure that the swap has
1790 // taken place: we don't want the user to send data to the old sink!
1791 re_log::trace!("Waiting for sink swap to complete…");
1792 let (cmd, oneshot) = Command::flush(timeout);
1793 inner.cmds_tx.send(cmd).ok();
1794 oneshot.recv().ok();
1795 re_log::trace!("Sink swap completed.");
1796 };
1797
1798 if self.with(f).is_none() {
1799 re_log::warn_once!("Recording disabled - call to set_sink() ignored");
1800 }
1801 }
1802
1803 /// Initiates a flush of the pipeline and returns immediately.
1804 ///
1805 /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
1806 /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1807 ///
1808 /// This will never return [`SinkFlushError::Timeout`].
1809 pub fn flush_async(&self) -> Result<(), SinkFlushError> {
1810 re_tracing::profile_function!();
1811 match self.flush(None) {
1812 Err(SinkFlushError::Timeout) => Ok(()),
1813 result => result,
1814 }
1815 }
1816
1817 /// Flush the batching pipeline and waits for it to propagate.
1818 ///
1819 /// The function will block until either the flush has completed successfully (`Ok`),
1820 /// an error has occurred (`SinkFlushError::Failed`), or the timeout is reached (`SinkFlushError::Timeout`).
1821 ///
1822 /// Convenience for calling [`Self::flush_with_timeout`] with a timeout of [`Duration::MAX`]
1823 pub fn flush_blocking(&self) -> Result<(), SinkFlushError> {
1824 re_tracing::profile_function!();
1825 self.flush_with_timeout(Duration::MAX)
1826 }
1827
1828 /// Flush the batching pipeline and optionally waits for it to propagate.
1829 /// If you don't want a timeout you can pass in [`Duration::MAX`].
1830 ///
1831 /// The function will block until that timeout is reached,
1832 /// an error occurs, or the flush is complete.
1833 /// The function will only block while there is some hope of progress.
1834 /// For instance: if the underlying gRPC connection is disconnected (or never connected at all),
1835 /// then [`SinkFlushError::Failed`] is returned.
1836 ///
1837 /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1838 pub fn flush_with_timeout(&self, timeout: Duration) -> Result<(), SinkFlushError> {
1839 re_tracing::profile_function!();
1840 self.flush(Some(timeout))
1841 }
1842
1843 /// Flush the batching pipeline and optionally waits for it to propagate.
1844 ///
1845 /// If `timeout` is `None`, then this function will start the flush, but NOT wait for it to finish.
1846 ///
1847 /// If a `timeout` is given, then the function will block until that timeout is reached,
1848 /// an error occurs, or the flush is complete.
1849 ///
1850 /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1851 fn flush(&self, timeout: Option<Duration>) -> Result<(), SinkFlushError> {
1852 if self.is_forked_child() {
1853 return Err(SinkFlushError::failed(
1854 "Fork detected during flush. cleanup_if_forked() should always be called after forking. This is likely a bug in the Rerun SDK.",
1855 ));
1856 }
1857
1858 let f = move |inner: &RecordingStreamInner| -> Result<(), SinkFlushError> {
1859 // 0. Wait for all pending data loader threads to complete
1860 //
1861 // This ensures that data from `log_file_from_path` and `log_file_from_contents`
1862 // is fully loaded before we flush the batcher and sink.
1863 inner.wait_for_dataloaders();
1864
1865 // 1. Synchronously flush the batcher down the chunk channel
1866 //
1867 // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all chunks
1868 // are ready to be drained by the time this call returns.
1869 // It cannot block indefinitely and is fairly fast as it only requires compute (no I/O).
1870 inner
1871 .batcher
1872 .flush_blocking(Duration::MAX)
1873 .map_err(|err| match err {
1874 BatcherFlushError::Closed => SinkFlushError::failed(err.to_string()),
1875 BatcherFlushError::Timeout => SinkFlushError::Timeout,
1876 })?;
1877
1878 // 2. Drain all pending chunks from the batcher's channel _before_ any other future command
1879 inner
1880 .cmds_tx
1881 .send(Command::PopPendingChunks)
1882 .map_err(|_ignored| {
1883 SinkFlushError::failed(
1884 "Sink shut down prematurely. This is likely a bug in the Rerun SDK.",
1885 )
1886 })?;
1887
1888 // 3. Asynchronously flush everything down the sink
1889 let (cmd, on_done) = Command::flush(Duration::MAX); // The background thread should block forever if necessary
1890 inner.cmds_tx.send(cmd).map_err(|_ignored| {
1891 SinkFlushError::failed(
1892 "Sink shut down prematurely. This is likely a bug in the Rerun SDK.",
1893 )
1894 })?;
1895
1896 if let Some(timeout) = timeout {
1897 on_done.recv_timeout(timeout).map_err(|err| match err {
1898 RecvTimeoutError::Timeout => SinkFlushError::Timeout,
1899 RecvTimeoutError::Disconnected => SinkFlushError::failed(
1900 "Flush never finished. This is likely a bug in the Rerun SDK.",
1901 ),
1902 })??;
1903 }
1904
1905 Ok(())
1906 };
1907
1908 match self.with(f) {
1909 Some(Ok(())) => Ok(()),
1910 Some(Err(err)) => Err(err),
1911 None => {
1912 re_log::warn_once!("Recording disabled - call to flush ignored");
1913 Ok(())
1914 }
1915 }
1916 }
1917}
1918
1919impl RecordingStream {
1920 /// Stream data to multiple different sinks.
1921 ///
1922 /// This is semantically the same as calling [`RecordingStream::set_sink`], but the resulting
1923 /// [`RecordingStream`] will now stream data to multiple sinks at the same time.
1924 ///
1925 /// Currently only supports [`GrpcSink`][grpc_sink] and [`FileSink`][file_sink].
1926 ///
1927 /// If the batcher's configuration has not been set explicitly or by environment variables,
1928 /// This will take over a conservative default of the new sinks.
1929 /// (there's no guarantee on when exactly the new configuration will be active)
1930 ///
1931 /// [grpc_sink]: crate::sink::GrpcSink
1932 /// [file_sink]: crate::sink::FileSink
1933 pub fn set_sinks(&self, sinks: impl crate::log_sink::IntoMultiSink) {
1934 if forced_sink_path().is_some() {
1935 re_log::debug!("Ignored setting new MultiSink since {ENV_FORCE_SAVE} is set");
1936 return;
1937 }
1938
1939 let sink = sinks.into_multi_sink();
1940
1941 self.set_sink(Box::new(sink));
1942 }
1943
1944 /// Asynchronously calls a method that has read access to the currently active sink.
1945 ///
1946 /// Since a recording stream's sink is owned by a different thread there is no guarantee when
1947 /// the callback is going to be called.
1948 /// It's advised to return as quickly as possible from the callback since
1949 /// as long as the callback doesn't return, the sink will not receive any new data,
1950 ///
1951 /// # Experimental
1952 ///
1953 /// This is an experimental API and may change in future releases.
1954 // TODO(#10444): This should become a lot more straight forward with explicit sinks.
1955 pub fn inspect_sink(&self, f: impl FnOnce(&dyn LogSink) + Send + 'static) {
1956 self.with(|inner| inner.cmds_tx.send(Command::InspectSink(Box::new(f))).ok());
1957 }
1958
1959 /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
1960 /// the specified address.
1961 ///
1962 /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
1963 ///
1964 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1965 /// terms of data durability and ordering.
1966 /// See [`Self::set_sink`] for more information.
1967 pub fn connect_grpc(&self) -> RecordingStreamResult<()> {
1968 self.connect_grpc_opts(format!(
1969 "rerun+http://127.0.0.1:{}/proxy",
1970 re_grpc_server::DEFAULT_SERVER_PORT
1971 ))
1972 }
1973
1974 /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
1975 /// the specified address.
1976 ///
1977 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1978 /// terms of data durability and ordering.
1979 /// See [`Self::set_sink`] for more information.
1980 ///
1981 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
1982 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
1983 /// call to `flush` to block indefinitely if a connection cannot be established.
1984 pub fn connect_grpc_opts(&self, url: impl Into<String>) -> RecordingStreamResult<()> {
1985 if forced_sink_path().is_some() {
1986 re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
1987 return Ok(());
1988 }
1989
1990 let url: String = url.into();
1991 let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
1992 return Err(RecordingStreamError::NotAProxyEndpoint);
1993 };
1994
1995 let sink = crate::log_sink::GrpcSink::new(uri);
1996
1997 self.set_sink(Box::new(sink));
1998 Ok(())
1999 }
2000
2001 #[cfg(feature = "server")]
2002 /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
2003 /// `rerun+http://127.0.0.1:9876/proxy`.
2004 ///
2005 /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
2006 ///
2007 /// You can connect a viewer to it with `rerun --connect`.
2008 ///
2009 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
2010 /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
2011 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
2012 pub fn serve_grpc(
2013 &self,
2014 server_options: re_grpc_server::ServerOptions,
2015 ) -> RecordingStreamResult<()> {
2016 self.serve_grpc_opts("0.0.0.0", crate::DEFAULT_SERVER_PORT, server_options)
2017 }
2018
2019 #[cfg(feature = "server")]
2020 /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
2021 /// `rerun+http://{bind_ip}:{port}/proxy`.
2022 ///
2023 /// `0.0.0.0` is a good default for `bind_ip`.
2024 ///
2025 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
2026 /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
2027 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
2028 pub fn serve_grpc_opts(
2029 &self,
2030 bind_ip: impl AsRef<str>,
2031 port: u16,
2032 server_options: re_grpc_server::ServerOptions,
2033 ) -> RecordingStreamResult<()> {
2034 if forced_sink_path().is_some() {
2035 re_log::debug!("Ignored setting GrpcServerSink since {ENV_FORCE_SAVE} is set");
2036 return Ok(());
2037 }
2038
2039 let sink = crate::grpc_server::GrpcServerSink::new(bind_ip.as_ref(), port, server_options)?;
2040
2041 self.set_sink(Box::new(sink));
2042 Ok(())
2043 }
2044
2045 /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
2046 /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
2047 /// new process.
2048 ///
2049 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
2050 /// that viewer instead of starting a new one.
2051 ///
2052 /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
2053 /// as well as the underlying connection.
2054 ///
2055 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2056 /// terms of data durability and ordering.
2057 /// See [`Self::set_sink`] for more information.
2058 pub fn spawn(&self) -> RecordingStreamResult<()> {
2059 self.spawn_opts(&Default::default())
2060 }
2061
2062 /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
2063 /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
2064 /// new process.
2065 ///
2066 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
2067 /// that viewer instead of starting a new one.
2068 ///
2069 /// The behavior of the spawned Viewer can be configured via `opts`.
2070 /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
2071 ///
2072 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2073 /// terms of data durability and ordering.
2074 /// See [`Self::set_sink`] for more information.
2075 ///
2076 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
2077 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
2078 /// call to `flush` to block indefinitely if a connection cannot be established.
2079 pub fn spawn_opts(&self, opts: &crate::SpawnOptions) -> RecordingStreamResult<()> {
2080 if !self.is_enabled() {
2081 re_log::debug!("Rerun disabled - call to spawn() ignored");
2082 return Ok(());
2083 }
2084 if forced_sink_path().is_some() {
2085 re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
2086 return Ok(());
2087 }
2088
2089 crate::spawn(opts)?;
2090
2091 self.connect_grpc_opts(format!("rerun+http://{}/proxy", opts.connect_addr()))?;
2092
2093 Ok(())
2094 }
2095
2096 /// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
2097 /// [`MemorySinkStorage`].
2098 ///
2099 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2100 /// terms of data durability and ordering.
2101 /// See [`Self::set_sink`] for more information.
2102 pub fn memory(&self) -> MemorySinkStorage {
2103 let sink = crate::sink::MemorySink::new(self.clone());
2104 let storage = sink.buffer();
2105 self.set_sink(Box::new(sink));
2106 storage
2107 }
2108
2109 /// Swaps the underlying sink for a [`crate::sink::BinaryStreamSink`] sink and returns the associated
2110 /// [`BinaryStreamStorage`].
2111 ///
2112 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2113 /// terms of data durability and ordering.
2114 /// See [`Self::set_sink`] for more information.
2115 pub fn binary_stream(&self) -> BinaryStreamStorage {
2116 let (sink, storage) = crate::sink::BinaryStreamSink::new(self.clone());
2117 self.set_sink(Box::new(sink));
2118 storage
2119 }
2120
2121 /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2122 ///
2123 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2124 /// terms of data durability and ordering.
2125 /// See [`Self::set_sink`] for more information.
2126 pub fn save(
2127 &self,
2128 path: impl Into<std::path::PathBuf>,
2129 ) -> Result<(), crate::sink::FileSinkError> {
2130 self.save_opts(path)
2131 }
2132
2133 /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2134 ///
2135 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2136 /// terms of data durability and ordering.
2137 /// See [`Self::set_sink`] for more information.
2138 ///
2139 /// If a blueprint was provided, it will be stored first in the file.
2140 /// Blueprints are currently an experimental part of the Rust SDK.
2141 pub fn save_opts(
2142 &self,
2143 path: impl Into<std::path::PathBuf>,
2144 ) -> Result<(), crate::sink::FileSinkError> {
2145 if forced_sink_path().is_some() {
2146 re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2147 return Ok(());
2148 }
2149
2150 let sink = crate::sink::FileSink::new(path)?;
2151
2152 self.set_sink(Box::new(sink));
2153
2154 Ok(())
2155 }
2156
2157 /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2158 ///
2159 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2160 /// default back to `buffered` mode, in order not to break the user's terminal.
2161 ///
2162 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2163 /// terms of data durability and ordering.
2164 /// See [`Self::set_sink`] for more information.
2165 pub fn stdout(&self) -> Result<(), crate::sink::FileSinkError> {
2166 self.stdout_opts()
2167 }
2168
2169 /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2170 ///
2171 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2172 /// default back to `buffered` mode, in order not to break the user's terminal.
2173 ///
2174 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2175 /// terms of data durability and ordering.
2176 /// See [`Self::set_sink`] for more information.
2177 ///
2178 /// If a blueprint was provided, it will be stored first in the file.
2179 /// Blueprints are currently an experimental part of the Rust SDK.
2180 pub fn stdout_opts(&self) -> Result<(), crate::sink::FileSinkError> {
2181 if forced_sink_path().is_some() {
2182 re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2183 return Ok(());
2184 }
2185
2186 if std::io::stdout().is_terminal() {
2187 re_log::debug!("Ignored call to stdout() because stdout is a terminal");
2188 self.set_sink(Box::new(crate::log_sink::BufferedSink::new()));
2189 return Ok(());
2190 }
2191
2192 let sink = crate::sink::FileSink::stdout()?;
2193
2194 self.set_sink(Box::new(sink));
2195
2196 Ok(())
2197 }
2198
2199 /// Swaps the underlying sink for a [`crate::sink::BufferedSink`].
2200 ///
2201 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2202 /// terms of data durability and ordering.
2203 /// See [`Self::set_sink`] for more information.
2204 pub fn disconnect(&self) {
2205 let f = move |inner: &RecordingStreamInner| {
2206 // When disconnecting, we need to make sure that pending top-level `DataLoader` threads that
2207 // were started from the SDK run to completion.
2208 inner.wait_for_dataloaders();
2209 self.set_sink(Box::new(crate::sink::BufferedSink::new()));
2210 };
2211
2212 if self.with(f).is_none() {
2213 re_log::warn_once!("Recording disabled - call to disconnect() ignored");
2214 }
2215 }
2216
2217 /// Send a blueprint through this recording stream
2218 pub fn send_blueprint(
2219 &self,
2220 blueprint: Vec<LogMsg>,
2221 activation_cmd: BlueprintActivationCommand,
2222 ) {
2223 let mut blueprint_id = None;
2224 for msg in blueprint {
2225 if blueprint_id.is_none() {
2226 blueprint_id = Some(msg.store_id().clone());
2227 }
2228 self.record_msg(msg);
2229 }
2230
2231 if let Some(blueprint_id) = blueprint_id {
2232 if blueprint_id == activation_cmd.blueprint_id {
2233 // Let the viewer know that the blueprint has been fully received,
2234 // and that it can now be activated.
2235 // We don't want to activate half-loaded blueprints, because that can be confusing,
2236 // and can also lead to problems with view heuristics.
2237 self.record_msg(activation_cmd.into());
2238 } else {
2239 re_log::warn!(
2240 "Blueprint ID mismatch when sending blueprint: {:?} != {:?}. Ignoring activation.",
2241 blueprint_id,
2242 activation_cmd.blueprint_id
2243 );
2244 }
2245 }
2246 }
2247}
2248
2249impl fmt::Debug for RecordingStream {
2250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2251 let with = |inner: &RecordingStreamInner| {
2252 let RecordingStreamInner {
2253 // This pattern match prevents _accidentally_ omitting data from the debug output
2254 // when new fields are added.
2255 store_info,
2256 recording_info,
2257 tick,
2258 cmds_tx: _,
2259 batcher: _,
2260 batcher_to_sink_handle: _,
2261 sink_dependent_batcher_config,
2262 dataloader_handles,
2263 pid_at_creation,
2264 } = inner;
2265
2266 f.debug_struct("RecordingStream")
2267 .field("store_info", &store_info)
2268 .field("recording_info", &recording_info)
2269 .field("tick", &tick)
2270 .field(
2271 "sink_dependent_batcher_config",
2272 &sink_dependent_batcher_config,
2273 )
2274 .field("pending_dataloaders", &dataloader_handles.lock().len())
2275 .field("pid_at_creation", &pid_at_creation)
2276 .finish_non_exhaustive()
2277 };
2278
2279 match self.with(with) {
2280 Some(res) => res,
2281 None => write!(f, "RecordingStream {{ disabled }}"),
2282 }
2283 }
2284}
2285
2286// --- Stateful time ---
2287
2288/// Thread-local data.
2289#[derive(Default)]
2290struct ThreadInfo {
2291 /// The current time per-thread per-recording, which can be set by users.
2292 timepoints: HashMap<StoreId, TimePoint>,
2293}
2294
2295impl ThreadInfo {
2296 fn thread_now(rid: &StoreId) -> TimePoint {
2297 Self::with(|ti| ti.now(rid))
2298 }
2299
2300 fn set_thread_time(rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2301 Self::with(|ti| ti.set_time(rid, timeline, cell));
2302 }
2303
2304 fn unset_thread_time(rid: &StoreId, timeline: &TimelineName) {
2305 Self::with(|ti| ti.unset_time(rid, timeline));
2306 }
2307
2308 fn reset_thread_time(rid: &StoreId) {
2309 Self::with(|ti| ti.reset_time(rid));
2310 }
2311
2312 /// Get access to the thread-local [`ThreadInfo`].
2313 fn with<R>(f: impl FnOnce(&mut Self) -> R) -> R {
2314 use std::cell::RefCell;
2315 thread_local! {
2316 static THREAD_INFO: RefCell<Option<ThreadInfo>> = const { RefCell::new(None) };
2317 }
2318
2319 THREAD_INFO.with(|thread_info| {
2320 let mut thread_info = thread_info.borrow_mut();
2321 let thread_info = thread_info.get_or_insert_with(Self::default);
2322 f(thread_info)
2323 })
2324 }
2325
2326 fn now(&self, rid: &StoreId) -> TimePoint {
2327 let mut timepoint = self.timepoints.get(rid).cloned().unwrap_or_default();
2328 timepoint.insert_cell(TimelineName::log_time(), TimeCell::timestamp_now());
2329 timepoint
2330 }
2331
2332 fn set_time(&mut self, rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2333 self.timepoints
2334 .entry(rid.clone())
2335 .or_default()
2336 .insert_cell(timeline, cell);
2337 }
2338
2339 fn unset_time(&mut self, rid: &StoreId, timeline: &TimelineName) {
2340 if let Some(timepoint) = self.timepoints.get_mut(rid) {
2341 timepoint.remove(timeline);
2342 }
2343 }
2344
2345 fn reset_time(&mut self, rid: &StoreId) {
2346 if let Some(timepoint) = self.timepoints.get_mut(rid) {
2347 *timepoint = TimePoint::default();
2348 }
2349 }
2350}
2351
2352impl RecordingStream {
2353 /// Returns the current time of the recording on the current thread.
2354 pub fn now(&self) -> TimePoint {
2355 let f =
2356 move |inner: &RecordingStreamInner| ThreadInfo::thread_now(&inner.store_info.store_id);
2357 if let Some(res) = self.with(f) {
2358 res
2359 } else {
2360 re_log::warn_once!("Recording disabled - call to now() ignored");
2361 TimePoint::default()
2362 }
2363 }
2364
2365 /// Set the current time of the recording, for the current calling thread.
2366 ///
2367 /// Used for all subsequent logging performed from this same thread, until the next call
2368 /// to one of the index/time setting methods.
2369 ///
2370 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2371 ///
2372 /// See also:
2373 /// - [`Self::set_time`]
2374 /// - [`Self::set_time_sequence`]
2375 /// - [`Self::set_duration_secs`]
2376 /// - [`Self::disable_timeline`]
2377 /// - [`Self::reset_time`]
2378 pub fn set_timepoint(&self, timepoint: impl Into<TimePoint>) {
2379 let f = move |inner: &RecordingStreamInner| {
2380 let timepoint = timepoint.into();
2381 for (timeline, time) in timepoint {
2382 ThreadInfo::set_thread_time(&inner.store_info.store_id, timeline, time);
2383 }
2384 };
2385
2386 if self.with(f).is_none() {
2387 re_log::warn_once!("Recording disabled - call to set_timepoint() ignored");
2388 }
2389 }
2390
2391 /// Set the current value of one of the timelines.
2392 ///
2393 /// Used for all subsequent logging performed from this same thread, until the next call
2394 /// to one of the index/time setting methods.
2395 ///
2396 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2397 ///
2398 /// Example:
2399 /// ```no_run
2400 /// # mod rerun { pub use re_sdk::*; }
2401 /// # let rec: rerun::RecordingStream = unimplemented!();
2402 /// rec.set_time("frame_nr", rerun::TimeCell::from_sequence(42));
2403 /// rec.set_time("duration", std::time::Duration::from_millis(123));
2404 /// rec.set_time("capture_time", std::time::SystemTime::now());
2405 /// ```
2406 ///
2407 /// See also:
2408 /// - [`Self::set_timepoint`]
2409 /// - [`Self::set_time_sequence`]
2410 /// - [`Self::set_duration_secs`]
2411 /// - [`Self::disable_timeline`]
2412 /// - [`Self::reset_time`]
2413 pub fn set_time(&self, timeline: impl Into<TimelineName>, value: impl TryInto<TimeCell>) {
2414 let f = move |inner: &RecordingStreamInner| {
2415 let timeline = timeline.into();
2416 if let Ok(value) = value.try_into() {
2417 ThreadInfo::set_thread_time(&inner.store_info.store_id, timeline, value);
2418 } else {
2419 re_log::warn_once!(
2420 "set_time({timeline}): Failed to convert the given value to an TimeCell"
2421 );
2422 }
2423 };
2424
2425 if self.with(f).is_none() {
2426 re_log::warn_once!("Recording disabled - call to set_time() ignored");
2427 }
2428 }
2429
2430 /// Set the current time of the recording, for the current calling thread.
2431 ///
2432 /// Short for `set_time(timeline, rerun::TimeCell::from_sequence(sequence))`.
2433 ///
2434 /// Used for all subsequent logging performed from this same thread, until the next call
2435 /// to one of the index/time setting methods.
2436 ///
2437 /// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
2438 /// You can remove a timeline again using `rec.disable_timeline("frame_nr")`.
2439 ///
2440 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2441 ///
2442 /// See also:
2443 /// - [`Self::set_time`]
2444 /// - [`Self::set_timepoint`]
2445 /// - [`Self::set_duration_secs`]
2446 /// - [`Self::disable_timeline`]
2447 /// - [`Self::reset_time`]
2448 #[inline]
2449 pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: impl Into<i64>) {
2450 self.set_time(timeline, TimeCell::from_sequence(sequence.into()));
2451 }
2452
2453 /// Set the current time of the recording, for the current calling thread.
2454 ///
2455 /// Short for `set_time(timeline, std::time::Duration::from_secs_f64(secs))`..
2456 ///
2457 /// Used for all subsequent logging performed from this same thread, until the next call
2458 /// to one of the index/time setting methods.
2459 ///
2460 /// For example: `rec.set_duration_secs("time_since_start", time_offset)`.
2461 /// You can remove a timeline again using `rec.disable_timeline("time_since_start")`.
2462 ///
2463 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2464 ///
2465 /// See also:
2466 /// - [`Self::set_time`]
2467 /// - [`Self::set_timepoint`]
2468 /// - [`Self::set_timestamp_secs_since_epoch`]
2469 /// - [`Self::set_time_sequence`]
2470 /// - [`Self::disable_timeline`]
2471 /// - [`Self::reset_time`]
2472 #[inline]
2473 pub fn set_duration_secs(&self, timeline: impl Into<TimelineName>, secs: impl Into<f64>) {
2474 let secs = secs.into();
2475 if let Ok(duration) = std::time::Duration::try_from_secs_f64(secs) {
2476 self.set_time(timeline, duration);
2477 } else {
2478 re_log::error_once!("set_duration_secs: can't set time to {secs}");
2479 }
2480 }
2481
2482 /// Set a timestamp as seconds since Unix epoch (1970-01-01 00:00:00 UTC).
2483 ///
2484 /// Short for `self.set_time(timeline, rerun::TimeCell::from_timestamp_secs_since_epoch(secs))`.
2485 ///
2486 /// Used for all subsequent logging performed from this same thread, until the next call
2487 /// to one of the index/time setting methods.
2488 ///
2489 /// You can remove a timeline again using `rec.disable_timeline(timeline)`.
2490 ///
2491 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2492 ///
2493 /// See also:
2494 /// - [`Self::set_time`]
2495 /// - [`Self::set_timepoint`]
2496 /// - [`Self::set_duration_secs`]
2497 /// - [`Self::set_time_sequence`]
2498 /// - [`Self::set_timestamp_nanos_since_epoch`]
2499 /// - [`Self::disable_timeline`]
2500 /// - [`Self::reset_time`]
2501 #[inline]
2502 pub fn set_timestamp_secs_since_epoch(
2503 &self,
2504 timeline: impl Into<TimelineName>,
2505 secs: impl Into<f64>,
2506 ) {
2507 self.set_time(
2508 timeline,
2509 TimeCell::from_timestamp_secs_since_epoch(secs.into()),
2510 );
2511 }
2512
2513 /// Set a timestamp as nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
2514 ///
2515 /// Short for `self.set_time(timeline, rerun::TimeCell::set_timestamp_nanos_since_epoch(secs))`.
2516 ///
2517 /// Used for all subsequent logging performed from this same thread, until the next call
2518 /// to one of the index/time setting methods.
2519 ///
2520 /// You can remove a timeline again using `rec.disable_timeline(timeline)`.
2521 ///
2522 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2523 ///
2524 /// See also:
2525 /// - [`Self::set_time`]
2526 /// - [`Self::set_timepoint`]
2527 /// - [`Self::set_duration_secs`]
2528 /// - [`Self::set_time_sequence`]
2529 /// - [`Self::set_timestamp_secs_since_epoch`]
2530 /// - [`Self::disable_timeline`]
2531 /// - [`Self::reset_time`]
2532 #[inline]
2533 pub fn set_timestamp_nanos_since_epoch(
2534 &self,
2535 timeline: impl Into<TimelineName>,
2536 nanos: impl Into<i64>,
2537 ) {
2538 self.set_time(
2539 timeline,
2540 TimeCell::from_timestamp_nanos_since_epoch(nanos.into()),
2541 );
2542 }
2543
2544 /// Clears out the current time of the recording for the specified timeline, for the
2545 /// current calling thread.
2546 ///
2547 /// For example: `rec.disable_timeline("frame")`, `rec.disable_timeline("sim_time")`.
2548 ///
2549 /// See also:
2550 /// - [`Self::set_timepoint`]
2551 /// - [`Self::set_time_sequence`]
2552 /// - [`Self::reset_time`]
2553 pub fn disable_timeline(&self, timeline: impl Into<TimelineName>) {
2554 let f = move |inner: &RecordingStreamInner| {
2555 let timeline = timeline.into();
2556 ThreadInfo::unset_thread_time(&inner.store_info.store_id, &timeline);
2557 };
2558
2559 if self.with(f).is_none() {
2560 re_log::warn_once!("Recording disabled - call to disable_timeline() ignored");
2561 }
2562 }
2563
2564 /// Clears out the current time of the recording, for the current calling thread.
2565 ///
2566 /// Used for all subsequent logging performed from this same thread, until the next call
2567 /// to one of the index/time setting methods.
2568 ///
2569 /// For example: `rec.reset_time()`.
2570 ///
2571 /// See also:
2572 /// - [`Self::set_timepoint`]
2573 /// - [`Self::set_time_sequence`]
2574 /// - [`Self::disable_timeline`]
2575 pub fn reset_time(&self) {
2576 let f = move |inner: &RecordingStreamInner| {
2577 ThreadInfo::reset_thread_time(&inner.store_info.store_id);
2578 };
2579
2580 if self.with(f).is_none() {
2581 re_log::warn_once!("Recording disabled - call to reset_time() ignored");
2582 }
2583 }
2584}
2585
2586// ---
2587
2588#[cfg(test)]
2589mod tests {
2590 use insta::assert_debug_snapshot;
2591 use itertools::Itertools as _;
2592 use re_log_types::example_components::{MyLabel, MyPoints};
2593 use re_sdk_types::SerializedComponentBatch;
2594
2595 use super::*;
2596
2597 struct DisplayDescrs(Chunk);
2598
2599 impl std::fmt::Debug for DisplayDescrs {
2600 #[inline]
2601 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2602 f.debug_list()
2603 .entries(
2604 self.0
2605 .component_descriptors()
2606 .map(|d| d.display_name().to_owned())
2607 .sorted(),
2608 )
2609 .finish()
2610 }
2611 }
2612
2613 #[test]
2614 fn impl_send_sync() {
2615 fn assert_send_sync<T: Send + Sync>() {}
2616 assert_send_sync::<RecordingStream>();
2617 }
2618
2619 #[test]
2620 fn never_flush() {
2621 let rec = RecordingStreamBuilder::new("rerun_example_never_flush")
2622 .enabled(true)
2623 .batcher_config(ChunkBatcherConfig::NEVER)
2624 .buffered()
2625 .unwrap();
2626
2627 let store_info = rec.store_info().unwrap();
2628
2629 let rows = example_rows(false);
2630 for row in rows.clone() {
2631 rec.record_row("a".into(), row, false);
2632 }
2633
2634 let storage = rec.memory();
2635 let mut msgs = {
2636 let mut msgs = storage.take();
2637 msgs.reverse();
2638 msgs
2639 };
2640
2641 // First message should be a set_store_info resulting from the original sink swap to
2642 // buffered mode.
2643 match msgs.pop().unwrap() {
2644 LogMsg::SetStoreInfo(msg) => {
2645 assert!(msg.row_id != *RowId::ZERO);
2646 similar_asserts::assert_eq!(store_info, msg.info);
2647 }
2648 _ => panic!("expected SetStoreInfo"),
2649 }
2650
2651 // Second message should be a set_store_info resulting from the later sink swap from
2652 // buffered mode into in-memory mode.
2653 // This arrives _before_ the data itself since we're using manual flushing.
2654 match msgs.pop().unwrap() {
2655 LogMsg::SetStoreInfo(msg) => {
2656 assert!(msg.row_id != *RowId::ZERO);
2657 similar_asserts::assert_eq!(store_info, msg.info);
2658 }
2659 _ => panic!("expected SetStoreInfo"),
2660 }
2661
2662 // The following flushes were sent as a result of the implicit flush when swapping the
2663 // underlying sink from buffered to in-memory.
2664
2665 // Chunk that contains the `RecordingInfo`.
2666 match msgs.pop().unwrap() {
2667 LogMsg::ArrowMsg(rid, msg) => {
2668 assert_eq!(store_info.store_id, rid);
2669
2670 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2671
2672 chunk.sanity_check().unwrap();
2673
2674 assert_debug_snapshot!(DisplayDescrs(chunk));
2675 }
2676 _ => panic!("expected ArrowMsg"),
2677 }
2678
2679 // Final message is the batched chunk itself.
2680 match msgs.pop().unwrap() {
2681 LogMsg::ArrowMsg(rid, msg) => {
2682 assert_eq!(store_info.store_id, rid);
2683
2684 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2685
2686 chunk.sanity_check().unwrap();
2687
2688 assert_debug_snapshot!(DisplayDescrs(chunk));
2689 }
2690 _ => panic!("expected ArrowMsg"),
2691 }
2692
2693 // That's all.
2694 assert!(msgs.pop().is_none());
2695 }
2696
2697 #[test]
2698 fn always_flush() {
2699 let rec = RecordingStreamBuilder::new("rerun_example_always_flush")
2700 .enabled(true)
2701 .batcher_config(ChunkBatcherConfig::ALWAYS)
2702 .buffered()
2703 .unwrap();
2704
2705 let store_info = rec.store_info().unwrap();
2706
2707 let rows = example_rows(false);
2708 for row in rows.clone() {
2709 rec.record_row("a".into(), row, false);
2710 }
2711
2712 let storage = rec.memory();
2713 let mut msgs = {
2714 let mut msgs = storage.take();
2715 msgs.reverse();
2716 msgs
2717 };
2718
2719 // First message should be a set_store_info resulting from the original sink swap to
2720 // buffered mode.
2721 match msgs.pop().unwrap() {
2722 LogMsg::SetStoreInfo(msg) => {
2723 assert!(msg.row_id != *RowId::ZERO);
2724 similar_asserts::assert_eq!(store_info, msg.info);
2725 }
2726 _ => panic!("expected SetStoreInfo"),
2727 }
2728
2729 // Second message should be a set_store_info resulting from the later sink swap from
2730 // buffered mode into in-memory mode.
2731 // This arrives _before_ the data itself since we're using manual flushing.
2732 match msgs.pop().unwrap() {
2733 LogMsg::SetStoreInfo(msg) => {
2734 assert!(msg.row_id != *RowId::ZERO);
2735 similar_asserts::assert_eq!(store_info, msg.info);
2736 }
2737 _ => panic!("expected SetStoreInfo"),
2738 }
2739
2740 let mut assert_next_row = || match msgs.pop().unwrap() {
2741 LogMsg::ArrowMsg(rid, msg) => {
2742 assert_eq!(store_info.store_id, rid);
2743
2744 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2745
2746 chunk.sanity_check().unwrap();
2747
2748 assert_debug_snapshot!(DisplayDescrs(chunk));
2749 }
2750 _ => panic!("expected ArrowMsg"),
2751 };
2752
2753 // 3rd, 4th, 5th, and 6th messages are all the single-row batched chunks themselves,
2754 // which were sent as a result of the implicit flush when swapping the underlying sink
2755 // from buffered to in-memory. Note that these messages contain the 2 recording property
2756 // chunks.
2757 assert_next_row(); // Contains `RecordingInfo`
2758 assert_next_row();
2759 assert_next_row();
2760 assert_next_row();
2761
2762 // That's all.
2763 assert!(msgs.pop().is_none());
2764 }
2765
2766 #[test]
2767 fn flush_hierarchy() {
2768 let (rec, storage) = RecordingStreamBuilder::new("rerun_example_flush_hierarchy")
2769 .enabled(true)
2770 .batcher_config(ChunkBatcherConfig::NEVER)
2771 .memory()
2772 .unwrap();
2773
2774 let store_info = rec.store_info().unwrap();
2775
2776 let rows = example_rows(false);
2777 for row in rows.clone() {
2778 rec.record_row("a".into(), row, false);
2779 }
2780
2781 {
2782 let mut msgs = {
2783 let mut msgs = storage.take();
2784 msgs.reverse();
2785 msgs
2786 };
2787
2788 // First message should be a set_store_info resulting from the original sink swap
2789 // to in-memory mode.
2790 match msgs.pop().unwrap() {
2791 LogMsg::SetStoreInfo(msg) => {
2792 assert!(msg.row_id != *RowId::ZERO);
2793 similar_asserts::assert_eq!(store_info, msg.info);
2794 }
2795 _ => panic!("expected SetStoreInfo"),
2796 }
2797
2798 // For reasons, MemorySink ends up with 2 StoreInfos.
2799 // TODO(jleibs): Avoid a redundant StoreInfo message.
2800 match msgs.pop().unwrap() {
2801 LogMsg::SetStoreInfo(msg) => {
2802 assert!(msg.row_id != *RowId::ZERO);
2803 similar_asserts::assert_eq!(store_info, msg.info);
2804 }
2805 _ => panic!("expected SetStoreInfo"),
2806 }
2807
2808 // MemorySinkStorage transparently handles flushing during `take()`!
2809
2810 // The batch that contains the `RecordingInfo`.
2811 match msgs.pop().unwrap() {
2812 LogMsg::ArrowMsg(rid, msg) => {
2813 assert_eq!(store_info.store_id, rid);
2814
2815 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2816
2817 chunk.sanity_check().unwrap();
2818
2819 assert_debug_snapshot!(DisplayDescrs(chunk));
2820 }
2821 _ => panic!("expected ArrowMsg"),
2822 }
2823
2824 // The batched chunk itself, which was sent as a result of the explicit flush above.
2825 match msgs.pop().unwrap() {
2826 LogMsg::ArrowMsg(rid, msg) => {
2827 assert_eq!(store_info.store_id, rid);
2828
2829 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2830
2831 chunk.sanity_check().unwrap();
2832
2833 assert_debug_snapshot!(DisplayDescrs(chunk));
2834 }
2835 _ => panic!("expected ArrowMsg"),
2836 }
2837
2838 // That's all.
2839 assert!(msgs.pop().is_none());
2840 }
2841 }
2842
2843 #[test]
2844 fn disabled() {
2845 let (rec, storage) = RecordingStreamBuilder::new("rerun_example_disabled")
2846 .enabled(false)
2847 .batcher_config(ChunkBatcherConfig::ALWAYS)
2848 .memory()
2849 .unwrap();
2850
2851 assert_eq!(rec.ref_count(), 0);
2852
2853 let rows = example_rows(false);
2854 for row in rows.clone() {
2855 rec.record_row("a".into(), row, false);
2856 }
2857
2858 let mut msgs = {
2859 let mut msgs = storage.take();
2860 msgs.reverse();
2861 msgs
2862 };
2863
2864 // That's all.
2865 assert!(msgs.pop().is_none());
2866 }
2867
2868 #[test]
2869 fn test_set_thread_local() {
2870 // Regression-test for https://github.com/rerun-io/rerun/issues/2889
2871 std::thread::Builder::new()
2872 .name("test_thead".to_owned())
2873 .spawn(|| {
2874 let stream = RecordingStreamBuilder::new("rerun_example_test")
2875 .buffered()
2876 .unwrap();
2877 RecordingStream::set_thread_local(StoreKind::Recording, Some(stream));
2878 })
2879 .unwrap()
2880 .join()
2881 .unwrap();
2882 }
2883
2884 fn example_rows(static_: bool) -> Vec<PendingRow> {
2885 use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
2886 use re_sdk_types::Loggable;
2887
2888 let mut tick = 0i64;
2889 let mut timepoint = |frame_nr: i64| {
2890 let mut tp = TimePoint::default();
2891 if !static_ {
2892 tp.insert(Timeline::log_time(), re_log_types::Timestamp::now());
2893 tp.insert(Timeline::log_tick(), tick);
2894 tp.insert(Timeline::new_sequence("frame_nr"), frame_nr);
2895 }
2896 tick += 1;
2897 tp
2898 };
2899
2900 let row0 = {
2901 PendingRow {
2902 row_id: RowId::new(),
2903 timepoint: timepoint(1),
2904 components: [
2905 (
2906 MyPoints::descriptor_points().component,
2907 SerializedComponentBatch::new(
2908 <MyPoint as Loggable>::to_arrow([
2909 MyPoint::new(10.0, 10.0),
2910 MyPoint::new(20.0, 20.0),
2911 ])
2912 .unwrap(),
2913 MyPoints::descriptor_points(),
2914 ),
2915 ), //
2916 (
2917 MyPoints::descriptor_colors().component,
2918 SerializedComponentBatch::new(
2919 <MyColor as Loggable>::to_arrow([MyColor(0x8080_80FF)]).unwrap(),
2920 MyPoints::descriptor_colors(),
2921 ),
2922 ), //
2923 (
2924 MyPoints::descriptor_labels().component,
2925 SerializedComponentBatch::new(
2926 <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
2927 MyPoints::descriptor_labels(),
2928 ),
2929 ), //
2930 ]
2931 .into_iter()
2932 .collect(),
2933 }
2934 };
2935
2936 let row1 = {
2937 PendingRow {
2938 row_id: RowId::new(),
2939 timepoint: timepoint(1),
2940 components: [
2941 (
2942 MyPoints::descriptor_points().component,
2943 SerializedComponentBatch::new(
2944 <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
2945 MyPoints::descriptor_points(),
2946 ),
2947 ), //
2948 (
2949 MyPoints::descriptor_colors().component,
2950 SerializedComponentBatch::new(
2951 <MyColor as Loggable>::to_arrow([] as [MyColor; 0]).unwrap(),
2952 MyPoints::descriptor_colors(),
2953 ),
2954 ), //
2955 (
2956 MyPoints::descriptor_labels().component,
2957 SerializedComponentBatch::new(
2958 <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
2959 MyPoints::descriptor_labels(),
2960 ),
2961 ), //
2962 ]
2963 .into_iter()
2964 .collect(),
2965 }
2966 };
2967
2968 let row2 = {
2969 PendingRow {
2970 row_id: RowId::new(),
2971 timepoint: timepoint(1),
2972 components: [
2973 (
2974 MyPoints::descriptor_points().component,
2975 SerializedComponentBatch::new(
2976 <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
2977 MyPoints::descriptor_points(),
2978 ),
2979 ), //
2980 (
2981 MyPoints::descriptor_colors().component,
2982 SerializedComponentBatch::new(
2983 <MyColor as Loggable>::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(),
2984 MyPoints::descriptor_colors(),
2985 ),
2986 ), //
2987 (
2988 MyPoints::descriptor_labels().component,
2989 SerializedComponentBatch::new(
2990 <MyLabel as Loggable>::to_arrow([MyLabel("hey".into())]).unwrap(),
2991 MyPoints::descriptor_labels(),
2992 ),
2993 ), //
2994 ]
2995 .into_iter()
2996 .collect(),
2997 }
2998 };
2999
3000 vec![row0, row1, row2]
3001 }
3002
3003 // See <https://github.com/rerun-io/rerun/pull/8587> for context.
3004 #[test]
3005 fn allows_componentbatch_unsized() {
3006 let labels = [
3007 MyLabel("a".into()),
3008 MyLabel("b".into()),
3009 MyLabel("c".into()),
3010 ];
3011
3012 let (rec, _mem) = RecordingStreamBuilder::new("rerun_example_test_componentbatch_unsized")
3013 .default_enabled(false)
3014 .enabled(false)
3015 .memory()
3016 .unwrap();
3017
3018 // This call used to *not* compile due to a lack of `?Sized` bounds.
3019 use re_sdk_types::ComponentBatch as _;
3020 rec.log(
3021 "labels",
3022 &labels
3023 .try_serialized(MyPoints::descriptor_labels())
3024 .unwrap(),
3025 )
3026 .unwrap();
3027 }
3028
3029 struct BatcherConfigTestSink {
3030 config: ChunkBatcherConfig,
3031 }
3032
3033 impl LogSink for BatcherConfigTestSink {
3034 fn default_batcher_config(&self) -> ChunkBatcherConfig {
3035 self.config.clone()
3036 }
3037
3038 fn send(&self, _msg: LogMsg) {
3039 // noop
3040 }
3041
3042 fn flush_blocking(&self, _timeout: Duration) -> Result<(), SinkFlushError> {
3043 Ok(())
3044 }
3045
3046 fn as_any(&self) -> &dyn std::any::Any {
3047 self
3048 }
3049 }
3050
3051 struct ScopedEnvVarSet {
3052 key: &'static str,
3053 }
3054
3055 impl ScopedEnvVarSet {
3056 #[expect(unsafe_code)]
3057 fn new(key: &'static str, value: &'static str) -> Self {
3058 // SAFETY: only used in tests.
3059 unsafe { std::env::set_var(key, value) };
3060 Self { key }
3061 }
3062 }
3063
3064 impl Drop for ScopedEnvVarSet {
3065 #[expect(unsafe_code)]
3066 fn drop(&mut self) {
3067 // SAFETY: only used in tests.
3068 unsafe {
3069 std::env::remove_var(self.key);
3070 }
3071 }
3072 }
3073
3074 const CONFIG_CHANGE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
3075
3076 fn clear_environment() {
3077 // SAFETY: only used in tests.
3078 #[expect(unsafe_code)]
3079 unsafe {
3080 std::env::remove_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED");
3081 std::env::remove_var("RERUN_FLUSH_NUM_BYTES");
3082 std::env::remove_var("RERUN_FLUSH_NUM_ROWS");
3083 std::env::remove_var("RERUN_FLUSH_TICK_SECS");
3084 std::env::remove_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED");
3085 }
3086 }
3087
3088 #[test]
3089 fn test_sink_dependent_batcher_config() {
3090 clear_environment();
3091
3092 let (tx, rx) = std::sync::mpsc::channel();
3093
3094 let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3095 .batcher_hooks(BatcherHooks {
3096 on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3097 tx.send(config.clone()).unwrap();
3098 })),
3099 ..BatcherHooks::NONE
3100 })
3101 .buffered()
3102 .unwrap();
3103
3104 let new_config = rx
3105 .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3106 .expect("no config change message received within timeout");
3107 assert_eq!(
3108 new_config,
3109 ChunkBatcherConfig::from_env().unwrap(),
3110 "Buffered sink should uses the config from the environment"
3111 );
3112
3113 // Change sink to our custom sink. Will it take over the setting?
3114 let injected_config = ChunkBatcherConfig {
3115 flush_tick: std::time::Duration::from_secs(123),
3116 flush_num_bytes: 123,
3117 flush_num_rows: 123,
3118 ..new_config
3119 };
3120 rec.set_sink(Box::new(BatcherConfigTestSink {
3121 config: injected_config.clone(),
3122 }));
3123 let new_config = rx
3124 .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3125 .expect("no config change message received within timeout");
3126
3127 assert_eq!(new_config, injected_config);
3128
3129 // Set flush num bytes through env var and set the sink again.
3130 // check that the env var is respected.
3131 let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_NUM_BYTES", "456");
3132 rec.set_sink(Box::new(BatcherConfigTestSink {
3133 config: injected_config.clone(),
3134 }));
3135 let new_config = rx
3136 .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3137 .expect("no config change message received within timeout");
3138 assert_eq!(
3139 new_config,
3140 ChunkBatcherConfig {
3141 flush_num_bytes: 456,
3142 ..injected_config
3143 },
3144 );
3145 }
3146
3147 #[test]
3148 fn test_explicit_batcher_config() {
3149 clear_environment();
3150
3151 // This environment variable should *not* override the explicit config.
3152 let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_TICK_SECS", "456");
3153 let explicit_config = ChunkBatcherConfig {
3154 flush_tick: std::time::Duration::from_secs(123),
3155 flush_num_bytes: 123,
3156 flush_num_rows: 123,
3157 ..ChunkBatcherConfig::DEFAULT
3158 };
3159
3160 let (tx, rx) = std::sync::mpsc::channel();
3161 let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3162 .batcher_config(explicit_config.clone())
3163 .batcher_hooks(BatcherHooks {
3164 on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3165 tx.send(config.clone()).unwrap();
3166 })),
3167 ..BatcherHooks::NONE
3168 })
3169 .buffered()
3170 .unwrap();
3171
3172 let new_config = rx
3173 .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3174 .expect("no config change message received within timeout");
3175 assert_eq!(new_config, explicit_config);
3176
3177 // Changing the sink should have no effect since an explicit config is in place.
3178 rec.set_sink(Box::new(BatcherConfigTestSink {
3179 config: ChunkBatcherConfig::ALWAYS,
3180 }));
3181 // Don't want to stall the test for CONFIG_CHANGE_TIMEOUT here.
3182 let new_config_recv_result = rx.recv_timeout(std::time::Duration::from_millis(100));
3183 assert_eq!(
3184 new_config_recv_result,
3185 Err(std::sync::mpsc::RecvTimeoutError::Timeout)
3186 );
3187 }
3188}