re_sdk/
recording_stream.rs

1use std::fmt;
2use std::io::IsTerminal as _;
3use std::sync::atomic::AtomicI64;
4use std::sync::{Arc, Weak};
5use std::time::Duration;
6
7use ahash::HashMap;
8use crossbeam::channel::{Receiver, RecvTimeoutError, Sender};
9use itertools::Either;
10use nohash_hasher::IntMap;
11use parking_lot::Mutex;
12use re_chunk::{
13    BatcherFlushError, BatcherHooks, Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError,
14    ChunkComponents, ChunkError, ChunkId, PendingRow, RowId, TimeColumn,
15};
16use re_log_types::{
17    ApplicationId, ArrowRecordBatchReleaseCallback, BlueprintActivationCommand, EntityPath, LogMsg,
18    RecordingId, StoreId, StoreInfo, StoreKind, StoreSource, TimeCell, TimeInt, TimePoint,
19    Timeline, TimelineName,
20};
21use re_sdk_types::archetypes::RecordingInfo;
22use re_sdk_types::components::Timestamp;
23use re_sdk_types::{AsComponents, SerializationError, SerializedComponentColumn};
24
25use crate::binary_stream_sink::BinaryStreamStorage;
26use crate::sink::{LogSink, MemorySinkStorage, SinkFlushError};
27
28// ---
29
30/// Private environment variable meant for tests.
31///
32/// When set, all recording streams will write to disk at the path indicated by the env-var rather
33/// than doing what they were asked to do - `connect_grpc()`, `buffered()`, even `save()` will re-use the same sink.
34const ENV_FORCE_SAVE: &str = "_RERUN_TEST_FORCE_SAVE";
35
36/// Returns path for force sink if private environment variable `_RERUN_TEST_FORCE_SAVE` is set
37///
38/// Newly created [`RecordingStream`]s should use a [`crate::sink::FileSink`] pointing to this path.
39/// Furthermore, [`RecordingStream::set_sink`] calls after this should not swap out to a new sink but re-use the existing one.
40/// Note that creating a new [`crate::sink::FileSink`] to the same file path (even temporarily) can cause
41/// a race between file creation (and thus clearing) and pending file writes.
42pub fn forced_sink_path() -> Option<String> {
43    std::env::var(ENV_FORCE_SAVE).ok()
44}
45
46/// Errors that can occur when creating/manipulating a [`RecordingStream`].
47#[derive(thiserror::Error, Debug)]
48pub enum RecordingStreamError {
49    /// Error within the underlying file sink.
50    #[error("Failed to create the underlying file sink: {0}")]
51    FileSink(#[from] re_log_encoding::FileSinkError),
52
53    /// Error within the underlying chunk batcher.
54    #[error("Failed to convert data to a valid chunk: {0}")]
55    Chunk(#[from] ChunkError),
56
57    /// Error within the underlying chunk batcher.
58    #[error("Failed to spawn the underlying batcher: {0}")]
59    ChunkBatcher(#[from] ChunkBatcherError),
60
61    /// Error within the underlying serializer.
62    #[error("Failed to serialize component data: {0}")]
63    Serialization(#[from] SerializationError),
64
65    /// Error spawning one of the background threads.
66    #[error("Failed to spawn background thread '{name}': {err}")]
67    SpawnThread {
68        /// Name of the thread
69        name: String,
70
71        /// Inner error explaining why the thread failed to spawn.
72        err: std::io::Error,
73    },
74
75    /// Error spawning a Rerun Viewer process.
76    #[error(transparent)] // makes bubbling all the way up to main look nice
77    SpawnViewer(#[from] crate::SpawnError),
78
79    /// Failure to host a web viewer and/or Rerun server.
80    #[cfg(feature = "web_viewer")]
81    #[error(transparent)]
82    WebSink(#[from] crate::web_viewer::WebViewerSinkError),
83
84    /// An error occurred while attempting to use a [`re_data_loader::DataLoader`].
85    #[cfg(feature = "data_loaders")]
86    #[error(transparent)]
87    DataLoaderError(#[from] re_data_loader::DataLoaderError),
88
89    /// Invalid gRPC server address.
90    #[error(transparent)]
91    UriError(#[from] re_uri::Error),
92
93    /// Invalid endpoint
94    #[error("not a `/proxy` endpoint")]
95    NotAProxyEndpoint,
96
97    /// Invalid bind IP.
98    #[error(transparent)]
99    InvalidAddress(#[from] std::net::AddrParseError),
100}
101
102/// Results that can occur when creating/manipulating a [`RecordingStream`].
103pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
104
105// ---
106
107/// Construct a [`RecordingStream`].
108///
109/// ``` no_run
110/// # use re_sdk::RecordingStreamBuilder;
111/// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
112/// # Ok::<(), Box<dyn std::error::Error>>(())
113/// ```
114///
115/// Automatically sends a [`Chunk`] with the default [`RecordingInfo`] to
116/// the sink, unless an explicit `recording_id` is set via [`RecordingStreamBuilder::recording_id`].
117#[derive(Debug)]
118pub struct RecordingStreamBuilder {
119    application_id: ApplicationId,
120    store_kind: StoreKind,
121    recording_id: Option<RecordingId>,
122    store_source: Option<StoreSource>,
123
124    default_enabled: bool,
125    enabled: Option<bool>,
126
127    batcher_hooks: BatcherHooks,
128    batcher_config: Option<ChunkBatcherConfig>,
129
130    // Optional user-defined recording properties.
131    should_send_properties: bool,
132    recording_info: RecordingInfo,
133}
134
135impl RecordingStreamBuilder {
136    /// Create a new [`RecordingStreamBuilder`] with the given [`ApplicationId`].
137    ///
138    /// The [`ApplicationId`] is usually the name of your app.
139    ///
140    /// ```no_run
141    /// # use re_sdk::RecordingStreamBuilder;
142    /// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
143    /// # Ok::<(), Box<dyn std::error::Error>>(())
144    /// ```
145    //
146    // NOTE: track_caller so that we can see if we are being called from an official example.
147    #[track_caller]
148    pub fn new(application_id: impl Into<ApplicationId>) -> Self {
149        let application_id = application_id.into();
150
151        Self {
152            application_id,
153            store_kind: StoreKind::Recording,
154            recording_id: None,
155            store_source: None,
156
157            default_enabled: true,
158            enabled: None,
159
160            batcher_config: None,
161            batcher_hooks: BatcherHooks::NONE,
162
163            should_send_properties: true,
164            recording_info: RecordingInfo::new()
165                .with_start_time(re_sdk_types::components::Timestamp::now()),
166        }
167    }
168
169    /// Create a new [`RecordingStreamBuilder`] with the given [`StoreId`].
170    //
171    // NOTE: track_caller so that we can see if we are being called from an official example.
172    #[track_caller]
173    pub fn from_store_id(store_id: &StoreId) -> Self {
174        Self {
175            application_id: store_id.application_id().clone(),
176            store_kind: store_id.kind(),
177            recording_id: Some(store_id.recording_id().clone()),
178            store_source: None,
179
180            default_enabled: true,
181            enabled: None,
182
183            batcher_config: None,
184            batcher_hooks: BatcherHooks::NONE,
185
186            should_send_properties: true,
187            recording_info: RecordingInfo::new()
188                .with_start_time(re_sdk_types::components::Timestamp::now()),
189        }
190    }
191
192    /// Set whether or not Rerun is enabled by default.
193    ///
194    /// If the `RERUN` environment variable is set, it will override this.
195    ///
196    /// Set also: [`Self::enabled`].
197    #[inline]
198    pub fn default_enabled(mut self, default_enabled: bool) -> Self {
199        self.default_enabled = default_enabled;
200        self
201    }
202
203    /// Set whether or not Rerun is enabled.
204    ///
205    /// Setting this will ignore the `RERUN` environment variable.
206    ///
207    /// Set also: [`Self::default_enabled`].
208    #[inline]
209    pub fn enabled(mut self, enabled: bool) -> Self {
210        self.enabled = Some(enabled);
211        self
212    }
213
214    /// Set the `RecordingId` for this context.
215    ///
216    /// If you're logging from multiple processes and want all the messages to end up in the same
217    /// recording, you must make sure that they all set the same `RecordingId` using this function.
218    ///
219    /// Note that many stores can share the same [`ApplicationId`], but they all have
220    /// unique `RecordingId`s.
221    ///
222    /// The default is to use a random `RecordingId`.
223    ///
224    /// When explicitly setting a `RecordingId`, the initial chunk that contains the recording
225    /// properties will not be sent.
226    #[inline]
227    pub fn recording_id(mut self, recording_id: impl Into<RecordingId>) -> Self {
228        self.recording_id = Some(recording_id.into());
229        self.send_properties(false)
230    }
231
232    /// Sets an optional name for the recording.
233    #[inline]
234    pub fn recording_name(mut self, name: impl Into<String>) -> Self {
235        self.recording_info = self.recording_info.with_name(name.into());
236        self
237    }
238
239    /// Sets an optional name for the recording.
240    #[inline]
241    pub fn recording_started(mut self, started: impl Into<Timestamp>) -> Self {
242        self.recording_info = self.recording_info.with_start_time(started);
243        self
244    }
245
246    /// Whether the [`RecordingInfo`] chunk should be sent.
247    #[inline]
248    pub fn send_properties(mut self, should_send: bool) -> Self {
249        self.should_send_properties = should_send;
250        self
251    }
252
253    /// Specifies the configuration of the internal data batching mechanism.
254    ///
255    /// If not set, the default configuration for the currently active sink will be used.
256    /// Any environment variables as specified on [`ChunkBatcherConfig`] will always override respective settings.
257    ///
258    /// See [`ChunkBatcher`] & [`ChunkBatcherConfig`] for more information.
259    #[inline]
260    pub fn batcher_config(mut self, config: ChunkBatcherConfig) -> Self {
261        self.batcher_config = Some(config);
262        self
263    }
264
265    /// Specifies callbacks for the batcher thread.
266    ///
267    /// See [`ChunkBatcher`] & [`BatcherHooks`] for more information.
268    #[inline]
269    pub fn batcher_hooks(mut self, hooks: BatcherHooks) -> Self {
270        self.batcher_hooks = hooks;
271        self
272    }
273
274    #[doc(hidden)]
275    #[inline]
276    pub fn store_source(mut self, store_source: StoreSource) -> Self {
277        self.store_source = Some(store_source);
278        self
279    }
280
281    #[doc(hidden)]
282    #[inline]
283    pub fn blueprint(mut self) -> Self {
284        self.store_kind = StoreKind::Blueprint;
285        self
286    }
287
288    /// Creates a new [`RecordingStream`] that starts in a buffering state (RAM).
289    ///
290    /// ## Example
291    ///
292    /// ```
293    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").buffered()?;
294    /// # Ok::<(), Box<dyn std::error::Error>>(())
295    /// ```
296    pub fn buffered(self) -> RecordingStreamResult<RecordingStream> {
297        let sink = crate::log_sink::BufferedSink::new();
298        let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
299        if enabled {
300            RecordingStream::new(
301                store_info,
302                properties,
303                batcher_config,
304                batcher_hooks,
305                Box::new(sink),
306            )
307        } else {
308            re_log::debug!("Rerun disabled - call to buffered() ignored");
309            Ok(RecordingStream::disabled())
310        }
311    }
312
313    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
314    /// [`crate::log_sink::MemorySink`].
315    ///
316    /// ## Example
317    ///
318    /// ```
319    /// # fn log_data(_: &re_sdk::RecordingStream) { }
320    ///
321    /// let (rec, storage) = re_sdk::RecordingStreamBuilder::new("rerun_example_app").memory()?;
322    ///
323    /// log_data(&rec);
324    ///
325    /// let data = storage.take();
326    ///
327    /// # Ok::<(), Box<dyn std::error::Error>>(())
328    /// ```
329    pub fn memory(
330        self,
331    ) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> {
332        let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
333        let rec = if enabled {
334            RecordingStream::new(
335                store_info,
336                properties,
337                batcher_config,
338                batcher_hooks,
339                Box::new(crate::log_sink::BufferedSink::new()),
340            )
341        } else {
342            re_log::debug!("Rerun disabled - call to memory() ignored");
343            Ok(RecordingStream::disabled())
344        }?;
345
346        let sink = crate::log_sink::MemorySink::new(rec.clone());
347        let storage = sink.buffer();
348        // Using set_sink here is necessary because the MemorySink needs to know
349        // it's own RecordingStream, which means we can't use `new` above.
350        // This has the downside of a bit of creation overhead and an extra StoreInfo
351        // message being sent to the sink.
352        // TODO(jleibs): Figure out a cleaner way to handle this.
353        rec.set_sink(Box::new(sink));
354        Ok((rec, storage))
355    }
356
357    /// Creates a new [`RecordingStream`] pre-configured to stream data to multiple sinks.
358    ///
359    /// Currently only supports [`GrpcSink`][grpc_sink] and [`FileSink`][file_sink].
360    ///
361    /// If the batcher configuration has not been set explicitly or by environment variables,
362    /// this will change the batcher configuration to a conservative (less often flushing) mix of
363    /// default configurations of the underlying sinks.
364    ///
365    /// [grpc_sink]: crate::sink::GrpcSink
366    /// [file_sink]: crate::sink::FileSink
367    pub fn set_sinks(
368        self,
369        sinks: impl crate::sink::IntoMultiSink,
370    ) -> RecordingStreamResult<RecordingStream> {
371        let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
372        if enabled {
373            RecordingStream::new(
374                store_info,
375                properties,
376                batcher_config,
377                batcher_hooks,
378                Box::new(sinks.into_multi_sink()),
379            )
380        } else {
381            re_log::debug!("Rerun disabled - call to set_sinks() ignored");
382            Ok(RecordingStream::disabled())
383        }
384    }
385
386    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
387    /// remote Rerun instance.
388    ///
389    /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
390    ///
391    /// ## Example
392    ///
393    /// ```no_run
394    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").connect_grpc()?;
395    /// # Ok::<(), Box<dyn std::error::Error>>(())
396    /// ```
397    pub fn connect_grpc(self) -> RecordingStreamResult<RecordingStream> {
398        self.connect_grpc_opts(format!(
399            "rerun+http://127.0.0.1:{}/proxy",
400            re_grpc_server::DEFAULT_SERVER_PORT
401        ))
402    }
403
404    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
405    /// remote Rerun instance.
406    ///
407    /// ## Example
408    ///
409    /// ```no_run
410    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
411    ///     .connect_grpc_opts("rerun+http://127.0.0.1:9876/proxy")?;
412    /// # Ok::<(), Box<dyn std::error::Error>>(())
413    /// ```
414    pub fn connect_grpc_opts(
415        self,
416        url: impl Into<String>,
417    ) -> RecordingStreamResult<RecordingStream> {
418        let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
419        if enabled {
420            let url: String = url.into();
421            let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
422                return Err(RecordingStreamError::NotAProxyEndpoint);
423            };
424
425            RecordingStream::new(
426                store_info,
427                properties,
428                batcher_config,
429                batcher_hooks,
430                Box::new(crate::log_sink::GrpcSink::new(uri)),
431            )
432        } else {
433            re_log::debug!("Rerun disabled - call to connect() ignored");
434            Ok(RecordingStream::disabled())
435        }
436    }
437
438    #[cfg(feature = "server")]
439    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
440    /// locally hosted gRPC server.
441    ///
442    /// The server is hosted on the default IP and port, and may be connected to by any SDK or Viewer
443    /// at `rerun+http://127.0.0.1:9876/proxy` or by just running `rerun --connect`.
444    ///
445    /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
446    ///
447    /// The gRPC server will buffer in memory so that late connecting viewers will still get all the data.
448    /// You can limit the amount of data buffered by the gRPC server using [`Self::serve_grpc_opts`].
449    /// Once the memory limit is reached, the earliest logged data
450    /// will be dropped. Static data is never dropped.
451    ///
452    /// It is highly recommended that you use [`Self::serve_grpc_opts`] and set the memory limit to `0B`
453    /// if both the server and client are running on the same machine, otherwise you're potentially
454    /// doubling your memory usage!
455    pub fn serve_grpc(self) -> RecordingStreamResult<RecordingStream> {
456        use re_grpc_server::ServerOptions;
457
458        self.serve_grpc_opts(
459            "0.0.0.0",
460            crate::DEFAULT_SERVER_PORT,
461            ServerOptions {
462                memory_limit: re_memory::MemoryLimit::from_fraction_of_total(0.25),
463                ..Default::default()
464            },
465        )
466    }
467
468    #[cfg(feature = "server")]
469    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
470    /// locally hosted gRPC server.
471    ///
472    /// The server is hosted on the given `bind_ip` and `port`, may be connected to by any SDK or Viewer
473    /// at `rerun+http://{bind_ip}:{port}/proxy`.
474    ///
475    /// `0.0.0.0` is a good default for `bind_ip`.
476    ///
477    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
478    /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
479    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
480    ///
481    /// If server & client are running on the same machine and all clients are expected to connect before
482    /// any data is sent, it is highly recommended that you set the memory limit to `0B`,
483    /// otherwise you're potentially doubling your memory usage!
484    pub fn serve_grpc_opts(
485        self,
486        bind_ip: impl AsRef<str>,
487        port: u16,
488        server_options: re_grpc_server::ServerOptions,
489    ) -> RecordingStreamResult<RecordingStream> {
490        let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
491        if enabled {
492            RecordingStream::new(
493                store_info,
494                properties,
495                batcher_config,
496                batcher_hooks,
497                Box::new(crate::grpc_server::GrpcServerSink::new(
498                    bind_ip.as_ref(),
499                    port,
500                    server_options,
501                )?),
502            )
503        } else {
504            re_log::debug!("Rerun disabled - call to serve_grpc() ignored");
505            Ok(RecordingStream::disabled())
506        }
507    }
508
509    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an
510    /// RRD file on disk.
511    ///
512    /// The Rerun Viewer is able to read continuously from the resulting rrd file while it is being written.
513    /// However, depending on your OS and configuration, changes may not be immediately visible due to file caching.
514    /// This is a common issue on Windows and (to a lesser extent) on `MacOS`.
515    ///
516    /// ## Example
517    ///
518    /// ```no_run
519    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
520    /// # Ok::<(), Box<dyn std::error::Error>>(())
521    /// ```
522    #[cfg(not(target_arch = "wasm32"))]
523    pub fn save(
524        self,
525        path: impl Into<std::path::PathBuf>,
526    ) -> RecordingStreamResult<RecordingStream> {
527        let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
528
529        if enabled {
530            RecordingStream::new(
531                store_info,
532                properties,
533                batcher_config,
534                batcher_hooks,
535                Box::new(crate::sink::FileSink::new(path)?),
536            )
537        } else {
538            re_log::debug!("Rerun disabled - call to save() ignored");
539            Ok(RecordingStream::disabled())
540        }
541    }
542
543    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to stdout.
544    ///
545    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
546    /// default back to `buffered` mode, in order not to break the user's terminal.
547    ///
548    /// ## Example
549    ///
550    /// ```no_run
551    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").stdout()?;
552    /// # Ok::<(), Box<dyn std::error::Error>>(())
553    /// ```
554    #[cfg(not(target_arch = "wasm32"))]
555    pub fn stdout(self) -> RecordingStreamResult<RecordingStream> {
556        if std::io::stdout().is_terminal() {
557            re_log::debug!("Ignored call to stdout() because stdout is a terminal");
558            return self.buffered();
559        }
560
561        let (enabled, store_info, properties, batcher_config, batcher_hooks) = self.into_args();
562
563        if enabled {
564            RecordingStream::new(
565                store_info,
566                properties,
567                batcher_config,
568                batcher_hooks,
569                Box::new(crate::sink::FileSink::stdout()?),
570            )
571        } else {
572            re_log::debug!("Rerun disabled - call to stdout() ignored");
573            Ok(RecordingStream::disabled())
574        }
575    }
576
577    /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
578    /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
579    ///
580    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
581    /// that viewer instead of starting a new one.
582    ///
583    /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
584    /// as well as the underlying connection.
585    ///
586    /// ## Example
587    ///
588    /// ```no_run
589    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").spawn()?;
590    /// # Ok::<(), Box<dyn std::error::Error>>(())
591    /// ```
592    pub fn spawn(self) -> RecordingStreamResult<RecordingStream> {
593        self.spawn_opts(&Default::default())
594    }
595
596    /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
597    /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
598    ///
599    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
600    /// that viewer instead of starting a new one.
601    ///
602    /// The behavior of the spawned Viewer can be configured via `opts`.
603    /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
604    ///
605    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
606    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
607    /// call to `flush` to block indefinitely if a connection cannot be established.
608    ///
609    /// ## Example
610    ///
611    /// ```no_run
612    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
613    ///     .spawn_opts(&re_sdk::SpawnOptions::default())?;
614    /// # Ok::<(), Box<dyn std::error::Error>>(())
615    /// ```
616    pub fn spawn_opts(self, opts: &crate::SpawnOptions) -> RecordingStreamResult<RecordingStream> {
617        if !self.is_enabled() {
618            re_log::debug!("Rerun disabled - call to spawn() ignored");
619            return Ok(RecordingStream::disabled());
620        }
621
622        let url = format!("rerun+http://{}/proxy", opts.connect_addr());
623
624        // NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
625        // what, thus spawning a viewer is pointless (and probably not intended).
626        if forced_sink_path().is_some() {
627            return self.connect_grpc_opts(url);
628        }
629
630        crate::spawn(opts)?;
631
632        self.connect_grpc_opts(url)
633    }
634
635    /// Returns whether or not logging is enabled, a [`StoreInfo`] and the associated batcher
636    /// configuration.
637    ///
638    /// This can be used to then construct a [`RecordingStream`] manually using
639    /// [`RecordingStream::new`].
640    pub fn into_args(
641        self,
642    ) -> (
643        bool,
644        StoreInfo,
645        Option<RecordingInfo>,
646        Option<ChunkBatcherConfig>,
647        BatcherHooks,
648    ) {
649        let enabled = self.is_enabled();
650
651        let Self {
652            application_id,
653            store_kind,
654            recording_id,
655            store_source,
656            default_enabled: _,
657            enabled: _,
658            batcher_config,
659            batcher_hooks,
660            should_send_properties,
661            recording_info,
662        } = self;
663
664        let store_id = StoreId::new(
665            store_kind,
666            application_id,
667            recording_id.unwrap_or_else(RecordingId::random),
668        );
669        let store_source = store_source.unwrap_or_else(|| StoreSource::RustSdk {
670            rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
671            llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
672        });
673
674        let store_info = StoreInfo::new(store_id, store_source);
675
676        (
677            enabled,
678            store_info,
679            should_send_properties.then_some(recording_info),
680            batcher_config,
681            batcher_hooks,
682        )
683    }
684
685    /// Internal check for whether or not logging is enabled using explicit/default settings & env var.
686    fn is_enabled(&self) -> bool {
687        self.enabled
688            .unwrap_or_else(|| crate::decide_logging_enabled(self.default_enabled))
689    }
690}
691
692// ----------------------------------------------------------------------------
693
694/// A [`RecordingStream`] handles everything related to logging data into Rerun.
695///
696/// You can construct a new [`RecordingStream`] using [`RecordingStreamBuilder`] or
697/// [`RecordingStream::new`].
698///
699/// ## Sinks
700///
701/// Data is logged into Rerun via [`LogSink`]s.
702///
703/// The underlying [`LogSink`] of a [`RecordingStream`] can be changed at any point during its
704/// lifetime by calling [`RecordingStream::set_sink`] or one of the higher level helpers
705/// ([`RecordingStream::connect_grpc`], [`RecordingStream::memory`],
706/// [`RecordingStream::save`], [`RecordingStream::disconnect`]).
707///
708/// See [`RecordingStream::set_sink`] for more information.
709///
710/// ## Multithreading and ordering
711///
712/// [`RecordingStream`] can be cheaply cloned and used freely across any number of threads.
713///
714/// Internally, all operations are linearized into a pipeline:
715/// - All operations sent by a given thread will take effect in the same exact order as that
716///   thread originally sent them in, from its point of view.
717/// - There isn't any well defined global order across multiple threads.
718///
719/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all previous data sent by the calling thread
720/// has been recorded and (if applicable) flushed to the underlying OS-managed file descriptor,
721/// but other threads may still have data in flight.
722///
723/// ## Shutdown
724///
725/// The [`RecordingStream`] can only be shutdown by dropping all instances of it, at which point
726/// it will automatically take care of flushing any pending data that might remain in the pipeline.
727///
728/// Shutting down cannot ever block.
729#[derive(Clone)]
730pub struct RecordingStream {
731    inner: Either<Arc<RecordingStreamInner>, Weak<RecordingStreamInner>>,
732}
733
734impl RecordingStream {
735    /// Passes a reference to the [`RecordingStreamInner`], if it exists.
736    ///
737    /// This works whether the underlying stream is strong or weak.
738    #[inline]
739    fn with<F: FnOnce(&RecordingStreamInner) -> R, R>(&self, f: F) -> Option<R> {
740        match &self.inner {
741            Either::Left(strong) => Some(f(strong)),
742            Either::Right(weak) => Some(f(&*weak.upgrade()?)),
743        }
744    }
745
746    /// Clones the [`RecordingStream`] without incrementing the refcount.
747    ///
748    /// Useful e.g. if you want to make sure that a detached thread won't prevent the [`RecordingStream`]
749    /// from flushing during shutdown.
750    //
751    // TODO(#5335): shutdown flushing behavior is too brittle.
752    #[inline]
753    pub fn clone_weak(&self) -> Self {
754        Self {
755            inner: match &self.inner {
756                Either::Left(strong) => Either::Right(Arc::downgrade(strong)),
757                Either::Right(weak) => Either::Right(Weak::clone(weak)),
758            },
759        }
760    }
761
762    /// Returns the current reference count of the [`RecordingStream`].
763    ///
764    /// Returns 0 if the stream was created by [`RecordingStream::disabled()`],
765    /// or if it is a [`clone_weak()`][Self::clone_weak] of a stream whose strong instances
766    /// have all been dropped.
767    pub fn ref_count(&self) -> usize {
768        match &self.inner {
769            Either::Left(strong) => Arc::strong_count(strong),
770            Either::Right(weak) => weak.strong_count(),
771        }
772    }
773}
774
775// TODO(#5335): shutdown flushing behavior is too brittle.
776impl Drop for RecordingStream {
777    #[inline]
778    fn drop(&mut self) {
779        // If this holds the last strong handle to the recording, make sure that all pending
780        // `DataLoader` threads that were started from the SDK actually run to completion (they
781        // all hold a weak handle to this very recording!).
782        //
783        // NOTE: It's very important to do so from the `Drop` implementation of `RecordingStream`
784        // itself, because the dataloader threads -- by definition -- will have to send data into
785        // this very recording, therefore we must make sure that at least one strong handle still lives
786        // on until they are all finished.
787        if let Either::Left(strong) = &mut self.inner
788            && Arc::strong_count(strong) == 1
789        {
790            // Keep the recording alive until all dataloaders are finished.
791            self.with(|inner| inner.wait_for_dataloaders());
792        }
793    }
794}
795
796struct RecordingStreamInner {
797    store_info: StoreInfo,
798    recording_info: Option<RecordingInfo>,
799    tick: AtomicI64,
800
801    /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
802    /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
803    /// running.
804    cmds_tx: Sender<Command>,
805
806    batcher: ChunkBatcher,
807    batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
808
809    /// It true, any new sink will update the batcher's configuration (as far as possible).
810    sink_dependent_batcher_config: bool,
811
812    /// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader`
813    /// machinery in the context of this `RecordingStream`.
814    ///
815    /// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
816    dataloader_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,
817
818    pid_at_creation: u32,
819}
820
821impl fmt::Debug for RecordingStreamInner {
822    #[inline]
823    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
824        f.debug_struct("RecordingStreamInner")
825            .field("store_id", &self.store_info.store_id)
826            .finish()
827    }
828}
829
830impl Drop for RecordingStreamInner {
831    fn drop(&mut self) {
832        if self.is_forked_child() {
833            re_log::error_once!(
834                "Fork detected while dropping RecordingStreamInner. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
835            );
836            return;
837        }
838
839        self.wait_for_dataloaders();
840
841        // NOTE: The command channel is private, if we're here, nothing is currently capable of
842        // sending data down the pipeline.
843        let timeout = Duration::MAX;
844        if let Err(err) = self.batcher.flush_blocking(timeout) {
845            re_log::error!("Failed to flush batcher: {err}");
846        }
847        self.cmds_tx.send(Command::PopPendingChunks).ok();
848        self.cmds_tx.send(Command::Shutdown).ok();
849        if let Some(handle) = self.batcher_to_sink_handle.take() {
850            handle.join().ok();
851        }
852    }
853}
854
855fn resolve_batcher_config(
856    batcher_config: Option<ChunkBatcherConfig>,
857    sink: &dyn LogSink,
858) -> ChunkBatcherConfig {
859    if let Some(explicit_batcher_config) = batcher_config {
860        explicit_batcher_config
861    } else {
862        let default_config = sink.default_batcher_config();
863        default_config.apply_env().unwrap_or_else(|err| {
864            re_log::error!("Failed to parse ChunkBatcherConfig from env: {}", err);
865            default_config
866        })
867    }
868}
869
870impl RecordingStreamInner {
871    fn new(
872        store_info: StoreInfo,
873        recording_info: Option<RecordingInfo>,
874        batcher_config: Option<ChunkBatcherConfig>,
875        batcher_hooks: BatcherHooks,
876        sink: Box<dyn LogSink>,
877    ) -> RecordingStreamResult<Self> {
878        let sink_dependent_batcher_config = batcher_config.is_none();
879        let batcher_config = resolve_batcher_config(batcher_config, &*sink);
880
881        let on_release = batcher_hooks.on_release.clone();
882        let batcher = ChunkBatcher::new(batcher_config, batcher_hooks)?;
883
884        {
885            re_log::debug!(
886                store_id = ?store_info.store_id,
887                "Setting StoreInfo",
888            );
889            sink.send(
890                re_log_types::SetStoreInfo {
891                    row_id: *RowId::new(),
892                    info: store_info.clone(),
893                }
894                .into(),
895            );
896        }
897
898        let (cmds_tx, cmds_rx) = crossbeam::channel::unbounded();
899
900        let batcher_to_sink_handle = {
901            const NAME: &str = "RecordingStream::batcher_to_sink";
902            std::thread::Builder::new()
903                .name(NAME.into())
904                .spawn({
905                    let info = store_info.clone();
906                    let batcher = batcher.clone();
907                    move || forwarding_thread(info, sink, cmds_rx, batcher.chunks(), on_release)
908                })
909                .map_err(|err| RecordingStreamError::SpawnThread {
910                    name: NAME.into(),
911                    err,
912                })?
913        };
914
915        if let Some(recording_info) = recording_info.as_ref() {
916            // We pre-populate the batcher with a chunk the contains the `RecordingInfo`
917            // so that these get automatically sent to the sink.
918
919            re_log::trace!(recording_info = ?recording_info, "Adding RecordingInfo to batcher");
920
921            let chunk = Chunk::builder(EntityPath::properties())
922                .with_archetype(RowId::new(), TimePoint::default(), recording_info)
923                .build()?;
924
925            batcher.push_chunk(chunk);
926        }
927
928        Ok(Self {
929            store_info,
930            recording_info,
931            tick: AtomicI64::new(0),
932            cmds_tx,
933            batcher,
934            batcher_to_sink_handle: Some(batcher_to_sink_handle),
935            sink_dependent_batcher_config,
936            dataloader_handles: Mutex::new(Vec::new()),
937            pid_at_creation: std::process::id(),
938        })
939    }
940
941    #[inline]
942    pub fn is_forked_child(&self) -> bool {
943        self.pid_at_creation != std::process::id()
944    }
945
946    /// Make sure all pending top-level `DataLoader` threads that were started from the SDK run to completion.
947    //
948    // TODO(cmc): At some point we might want to make it configurable, though I cannot really
949    // think of a use case where you'd want to drop those threads immediately upon
950    // disconnection.
951    fn wait_for_dataloaders(&self) {
952        let dataloader_handles = std::mem::take(&mut *self.dataloader_handles.lock());
953        for handle in dataloader_handles {
954            handle.join().ok();
955        }
956    }
957}
958
959type InspectSinkFn = Box<dyn FnOnce(&dyn LogSink) + Send + 'static>;
960
961type FlushResult = Result<(), SinkFlushError>;
962
963enum Command {
964    RecordMsg(LogMsg),
965    SwapSink {
966        new_sink: Box<dyn LogSink>,
967        timeout: Duration,
968    },
969    // TODO(#10444): This should go away with more explicit sinks.
970    InspectSink(InspectSinkFn),
971    Flush {
972        on_done: Sender<FlushResult>,
973        timeout: Duration,
974    },
975    PopPendingChunks,
976    Shutdown,
977}
978
979impl Command {
980    fn flush(timeout: Duration) -> (Self, Receiver<FlushResult>) {
981        let (on_done, rx) = crossbeam::channel::bounded(1); // oneshot
982        (Self::Flush { on_done, timeout }, rx)
983    }
984}
985
986impl RecordingStream {
987    /// Creates a new [`RecordingStream`] with a given [`StoreInfo`] and [`LogSink`].
988    ///
989    /// You can create a [`StoreInfo`] with [`crate::new_store_info`];
990    ///
991    /// The [`StoreInfo`] is immediately sent to the sink in the form of a
992    /// [`re_log_types::SetStoreInfo`].
993    ///
994    /// You can find sinks in [`crate::sink`].
995    ///
996    /// If no batcher configuration is provided, the default batcher configuration for the sink will be used.
997    /// Any environment variables as specified in [`ChunkBatcherConfig`] will always override respective settings.
998    ///
999    /// See also: [`RecordingStreamBuilder`].
1000    #[must_use = "Recording will get closed automatically once all instances of this object have been dropped"]
1001    pub fn new(
1002        store_info: StoreInfo,
1003        recording_info: Option<RecordingInfo>,
1004        batcher_config: Option<ChunkBatcherConfig>,
1005        batcher_hooks: BatcherHooks,
1006        sink: Box<dyn LogSink>,
1007    ) -> RecordingStreamResult<Self> {
1008        let sink = store_info
1009            .store_id
1010            .is_recording()
1011            .then(forced_sink_path)
1012            .flatten()
1013            .map_or(sink, |path| {
1014                re_log::info!("Forcing FileSink because of env-var {ENV_FORCE_SAVE}={path:?}");
1015                Box::new(
1016                    crate::sink::FileSink::new(path)
1017                        .expect("Failed to create FileSink for forced test path"),
1018                ) as Box<dyn LogSink>
1019            });
1020
1021        let stream = RecordingStreamInner::new(
1022            store_info,
1023            recording_info,
1024            batcher_config,
1025            batcher_hooks,
1026            sink,
1027        )
1028        .map(|inner| Self {
1029            inner: Either::Left(Arc::new(inner)),
1030        })?;
1031
1032        Ok(stream)
1033    }
1034
1035    /// Creates a new no-op [`RecordingStream`] that drops all logging messages, doesn't allocate
1036    /// any memory and doesn't spawn any threads.
1037    ///
1038    /// [`Self::is_enabled`] will return `false`.
1039    pub const fn disabled() -> Self {
1040        Self {
1041            inner: Either::Right(Weak::new()),
1042        }
1043    }
1044}
1045
1046impl RecordingStream {
1047    /// Log data to Rerun.
1048    ///
1049    /// This is the main entry point for logging data to rerun. It can be used to log anything
1050    /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1051    /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1052    ///
1053    /// The data will be timestamped automatically based on the [`RecordingStream`]'s internal clock.
1054    /// See [`RecordingStream::set_time_sequence`] etc for more information.
1055    ///
1056    /// The entity path can either be a string
1057    /// (with special characters escaped, split on unescaped slashes)
1058    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1059    /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1060    ///
1061    /// See also: [`Self::log_static`] for logging static data.
1062    ///
1063    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1064    /// transport.
1065    /// See [SDK Micro Batching] for more information.
1066    ///
1067    /// # Example:
1068    /// ```ignore
1069    /// # use rerun;
1070    /// # let (rec, storage) = rerun::RecordingStreamBuilder::new("rerun_example_points3d_simple").memory()?;
1071    /// rec.log(
1072    ///     "my/points",
1073    ///     &rerun::Points3D::new([(0.0, 0.0, 0.0), (1.0, 1.0, 1.0)]),
1074    /// )?;
1075    /// # Ok::<(), Box<dyn std::error::Error>>(())
1076    /// ```
1077    ///
1078    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1079    /// [component bundle]: [`AsComponents`]
1080    #[inline]
1081    pub fn log<AS: ?Sized + AsComponents>(
1082        &self,
1083        ent_path: impl Into<EntityPath>,
1084        as_components: &AS,
1085    ) -> RecordingStreamResult<()> {
1086        self.log_with_static(ent_path, false, as_components)
1087    }
1088
1089    /// Lower-level logging API to provide data spanning multiple timepoints.
1090    ///
1091    /// Unlike the regular `log` API, which is row-oriented, this API lets you submit the data
1092    /// in a columnar form. The lengths of all of the [`TimeColumn`] and the component columns
1093    /// must match. All data that occurs at the same index across the different index/time and components
1094    /// arrays will act as a single logical row.
1095    ///
1096    /// Note that this API ignores any stateful index/time set on the log stream via the
1097    /// [`Self::set_time`]/[`Self::set_timepoint`]/etc. APIs.
1098    /// Furthermore, this will _not_ inject the default timelines `log_tick` and `log_time` timeline columns.
1099    pub fn send_columns(
1100        &self,
1101        ent_path: impl Into<EntityPath>,
1102        indexes: impl IntoIterator<Item = TimeColumn>,
1103        columns: impl IntoIterator<Item = SerializedComponentColumn>,
1104    ) -> RecordingStreamResult<()> {
1105        let id = ChunkId::new();
1106
1107        let indexes = indexes
1108            .into_iter()
1109            .map(|col| (*col.timeline().name(), col))
1110            .collect();
1111
1112        let components: ChunkComponents = columns
1113            .into_iter()
1114            .map(|column| (column.descriptor, column.list_array))
1115            .collect();
1116
1117        let chunk = Chunk::from_auto_row_ids(id, ent_path.into(), indexes, components)?;
1118
1119        self.send_chunk(chunk);
1120
1121        Ok(())
1122    }
1123
1124    /// Log data to Rerun.
1125    ///
1126    /// It can be used to log anything
1127    /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1128    /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1129    ///
1130    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1131    /// any temporal data of the same type.
1132    /// All timestamp data associated with this message will be dropped right before sending it to Rerun.
1133    ///
1134    /// This is most often used for [`rerun::ViewCoordinates`](https://docs.rs/rerun/latest/rerun/archetypes/struct.ViewCoordinates.html) and
1135    /// [`rerun::AnnotationContext`](https://docs.rs/rerun/latest/rerun/archetypes/struct.AnnotationContext.html).
1136    ///
1137    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1138    /// transport.
1139    /// See [SDK Micro Batching] for more information.
1140    ///
1141    /// See also [`Self::log`].
1142    ///
1143    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1144    /// [component bundle]: [`AsComponents`]
1145    #[inline]
1146    pub fn log_static<AS: ?Sized + AsComponents>(
1147        &self,
1148        ent_path: impl Into<EntityPath>,
1149        as_components: &AS,
1150    ) -> RecordingStreamResult<()> {
1151        self.log_with_static(ent_path, true, as_components)
1152    }
1153
1154    /// Logs the contents of a [component bundle] into Rerun.
1155    ///
1156    /// If `static_` is set to `true`, all timestamp data associated with this message will be
1157    /// dropped right before sending it to Rerun.
1158    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1159    /// any temporal data of the same type.
1160    ///
1161    /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1162    /// internal clock.
1163    /// See `RecordingStream::set_time_*` family of methods for more information.
1164    ///
1165    /// The entity path can either be a string
1166    /// (with special characters escaped, split on unescaped slashes)
1167    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1168    /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1169    ///
1170    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1171    /// transport.
1172    /// See [SDK Micro Batching] for more information.
1173    ///
1174    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1175    /// [component bundle]: [`AsComponents`]
1176    #[inline]
1177    pub fn log_with_static<AS: ?Sized + AsComponents>(
1178        &self,
1179        ent_path: impl Into<EntityPath>,
1180        static_: bool,
1181        as_components: &AS,
1182    ) -> RecordingStreamResult<()> {
1183        let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1184        self.log_serialized_batches_impl(
1185            row_id,
1186            ent_path,
1187            static_,
1188            as_components.as_serialized_batches(),
1189        )
1190    }
1191
1192    /// Logs a set of [`SerializedComponentBatch`]es into Rerun.
1193    ///
1194    /// If `static_` is set to `true`, all timestamp data associated with this message will be
1195    /// dropped right before sending it to Rerun.
1196    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1197    /// any temporal data of the same type.
1198    ///
1199    /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1200    /// internal clock.
1201    /// See `RecordingStream::set_time_*` family of methods for more information.
1202    ///
1203    /// The number of instances will be determined by the longest batch in the bundle.
1204    ///
1205    /// The entity path can either be a string
1206    /// (with special characters escaped, split on unescaped slashes)
1207    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1208    /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1209    ///
1210    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1211    /// transport.
1212    /// See [SDK Micro Batching] for more information.
1213    ///
1214    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1215    ///
1216    /// [`SerializedComponentBatch`]: [re_types_core::SerializedComponentBatch]
1217    pub fn log_serialized_batches(
1218        &self,
1219        ent_path: impl Into<EntityPath>,
1220        static_: bool,
1221        comp_batches: impl IntoIterator<Item = re_sdk_types::SerializedComponentBatch>,
1222    ) -> RecordingStreamResult<()> {
1223        let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1224        self.log_serialized_batches_impl(row_id, ent_path, static_, comp_batches)
1225    }
1226
1227    /// Sends a property to the recording.
1228    #[inline]
1229    pub fn send_property<AS: ?Sized + AsComponents>(
1230        &self,
1231        name: impl Into<String>,
1232        values: &AS,
1233    ) -> RecordingStreamResult<()> {
1234        let sub_path = EntityPath::from(name.into());
1235        self.log_static(EntityPath::properties().join(&sub_path), values)
1236    }
1237
1238    /// Sends the name of the recording.
1239    #[inline]
1240    pub fn send_recording_name(&self, name: impl Into<String>) -> RecordingStreamResult<()> {
1241        let update = RecordingInfo::update_fields().with_name(name.into());
1242        self.log_static(EntityPath::properties(), &update)
1243    }
1244
1245    /// Sends the start time of the recording.
1246    #[inline]
1247    pub fn send_recording_start_time(
1248        &self,
1249        timestamp: impl Into<Timestamp>,
1250    ) -> RecordingStreamResult<()> {
1251        let update = RecordingInfo::update_fields().with_start_time(timestamp.into());
1252        self.log_static(EntityPath::properties(), &update)
1253    }
1254
1255    // NOTE: For bw and fw compatibility reasons, we need our logging APIs to be fallible, even
1256    // though they really aren't at the moment.
1257    #[expect(clippy::unnecessary_wraps)]
1258    fn log_serialized_batches_impl(
1259        &self,
1260        row_id: RowId,
1261        entity_path: impl Into<EntityPath>,
1262        static_: bool,
1263        comp_batches: impl IntoIterator<Item = re_sdk_types::SerializedComponentBatch>,
1264    ) -> RecordingStreamResult<()> {
1265        if !self.is_enabled() {
1266            return Ok(()); // silently drop the message
1267        }
1268
1269        let entity_path = entity_path.into();
1270
1271        let components: IntMap<_, _> = comp_batches
1272            .into_iter()
1273            .map(|comp_batch| (comp_batch.descriptor.component, comp_batch))
1274            .collect();
1275
1276        // NOTE: The timepoint is irrelevant, the `RecordingStream` will overwrite it using its
1277        // internal clock.
1278        let timepoint = TimePoint::default();
1279
1280        if !components.is_empty() {
1281            let row = PendingRow {
1282                row_id,
1283                timepoint,
1284                components,
1285            };
1286            self.record_row(entity_path, row, !static_);
1287        }
1288
1289        Ok(())
1290    }
1291
1292    /// Logs the file at the given `path` using all [`re_data_loader::DataLoader`]s available.
1293    ///
1294    /// A single `path` might be handled by more than one loader.
1295    ///
1296    /// This method blocks until either at least one [`re_data_loader::DataLoader`] starts
1297    /// streaming data in or all of them fail.
1298    ///
1299    /// See <https://www.rerun.io/docs/reference/data-loaders/overview> for more information.
1300    #[cfg(feature = "data_loaders")]
1301    pub fn log_file_from_path(
1302        &self,
1303        filepath: impl AsRef<std::path::Path>,
1304        entity_path_prefix: Option<EntityPath>,
1305        static_: bool,
1306    ) -> RecordingStreamResult<()> {
1307        self.log_file(filepath, None, entity_path_prefix, static_, true)
1308    }
1309
1310    /// Logs the given `contents` using all [`re_data_loader::DataLoader`]s available.
1311    ///
1312    /// A single `path` might be handled by more than one loader.
1313    ///
1314    /// This method blocks until either at least one [`re_data_loader::DataLoader`] starts
1315    /// streaming data in or all of them fail.
1316    ///
1317    /// See <https://www.rerun.io/docs/reference/data-loaders/overview> for more information.
1318    #[cfg(feature = "data_loaders")]
1319    pub fn log_file_from_contents(
1320        &self,
1321        filepath: impl AsRef<std::path::Path>,
1322        contents: std::borrow::Cow<'_, [u8]>,
1323        entity_path_prefix: Option<EntityPath>,
1324        static_: bool,
1325    ) -> RecordingStreamResult<()> {
1326        self.log_file(filepath, Some(contents), entity_path_prefix, static_, true)
1327    }
1328
1329    /// If `prefer_current_recording` is set (which is always the case for now), the dataloader settings
1330    /// will be configured as if the current SDK recording is the currently opened recording.
1331    /// Most dataloaders prefer logging to the currently opened recording if one is set.
1332    #[cfg(feature = "data_loaders")]
1333    fn log_file(
1334        &self,
1335        filepath: impl AsRef<std::path::Path>,
1336        contents: Option<std::borrow::Cow<'_, [u8]>>,
1337        entity_path_prefix: Option<EntityPath>,
1338        static_: bool,
1339        prefer_current_recording: bool,
1340    ) -> RecordingStreamResult<()> {
1341        let Some(store_info) = self.store_info().clone() else {
1342            re_log::warn!(
1343                "Ignored call to log_file() because RecordingStream has not been properly initialized"
1344            );
1345            return Ok(());
1346        };
1347
1348        let filepath = filepath.as_ref();
1349        let has_contents = contents.is_some();
1350
1351        let (tx, rx) =
1352            re_log_channel::log_channel(re_log_channel::LogSource::File(filepath.into()));
1353
1354        let mut settings = crate::DataLoaderSettings {
1355            application_id: Some(store_info.application_id().clone()),
1356            recording_id: store_info.recording_id().clone(),
1357            opened_store_id: None,
1358            force_store_info: false,
1359            entity_path_prefix,
1360            timepoint: (!static_).then(|| {
1361                self.with(|inner| {
1362                    // Get the current time on all timelines, for the current recording, on the current
1363                    // thread…
1364                    let mut now = self.now();
1365
1366                    // …and then also inject the current recording tick into it.
1367                    let tick = inner
1368                        .tick
1369                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1370                    now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1371
1372                    now
1373                })
1374                .unwrap_or_default()
1375            }),
1376        };
1377
1378        if prefer_current_recording {
1379            settings.opened_store_id = Some(store_info.store_id);
1380        }
1381
1382        if let Some(contents) = contents {
1383            re_data_loader::load_from_file_contents(
1384                &settings,
1385                re_log_types::FileSource::Sdk,
1386                filepath,
1387                contents,
1388                &tx,
1389            )?;
1390        } else {
1391            re_data_loader::load_from_path(
1392                &settings,
1393                re_log_types::FileSource::Sdk,
1394                filepath,
1395                &tx,
1396            )?;
1397        }
1398        drop(tx);
1399
1400        // We can safely ignore the error on `recv()` as we're in complete control of both ends of
1401        // the channel.
1402        let thread_name = if has_contents {
1403            format!("log_file_from_contents({filepath:?})")
1404        } else {
1405            format!("log_file_from_path({filepath:?})")
1406        };
1407        let handle = std::thread::Builder::new()
1408            .name(thread_name.clone())
1409            .spawn({
1410                let this = self.clone_weak();
1411                move || {
1412                    while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) {
1413                        match msg {
1414                            re_log_channel::DataSourceMessage::LogMsg(log_msg) => {
1415                                this.record_msg(log_msg);
1416                            }
1417                            unsupported => {
1418                                re_log::error_once!(
1419                                    "Ignoring unexpected {} in file",
1420                                    unsupported.variant_name()
1421                                );
1422                            }
1423                        }
1424                    }
1425                }
1426            })
1427            .map_err(|err| RecordingStreamError::SpawnThread {
1428                name: thread_name,
1429                err,
1430            })?;
1431
1432        self.with(|inner| inner.dataloader_handles.lock().push(handle));
1433
1434        Ok(())
1435    }
1436}
1437
1438#[expect(clippy::needless_pass_by_value)]
1439fn forwarding_thread(
1440    store_info: StoreInfo,
1441    mut sink: Box<dyn LogSink>,
1442    cmds_rx: Receiver<Command>,
1443    chunks: Receiver<Chunk>,
1444    on_release: Option<ArrowRecordBatchReleaseCallback>,
1445) {
1446    /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate
1447    /// shutdown.
1448    fn handle_cmd(store_info: &StoreInfo, cmd: Command, sink: &mut Box<dyn LogSink>) -> bool {
1449        match cmd {
1450            Command::RecordMsg(msg) => {
1451                sink.send(msg);
1452            }
1453            Command::SwapSink { new_sink, timeout } => {
1454                re_log::trace!("Swapping sink…");
1455
1456                let backlog = {
1457                    // Capture the backlog if it exists.
1458                    let backlog = sink.drain_backlog();
1459
1460                    // Flush the underlying sink if possible.
1461                    if let Err(err) = sink.flush_blocking(timeout) {
1462                        re_log::error!("Failed to flush previous sink: {err}");
1463                    }
1464
1465                    backlog
1466                };
1467
1468                // Send the recording info to the new sink. This is idempotent.
1469                {
1470                    re_log::debug!(
1471                        store_id = ?store_info.store_id,
1472                        "Setting StoreInfo",
1473                    );
1474                    new_sink.send(
1475                        re_log_types::SetStoreInfo {
1476                            row_id: *RowId::new(),
1477                            info: store_info.clone(),
1478                        }
1479                        .into(),
1480                    );
1481                    new_sink.send_all(backlog);
1482                }
1483
1484                *sink = new_sink;
1485            }
1486            Command::InspectSink(f) => {
1487                f(sink.as_ref());
1488            }
1489            Command::Flush { on_done, timeout } => {
1490                re_log::trace!("Flushing…");
1491
1492                let result = sink.flush_blocking(timeout);
1493
1494                // Send back the result:
1495                if let Err(crossbeam::channel::SendError(result)) = on_done.send(result)
1496                    && let Err(err) = result
1497                {
1498                    // There was an error, and nobody received it:
1499                    re_log::error!("Failed to flush sink: {err}");
1500                }
1501            }
1502            Command::PopPendingChunks => {
1503                // Wake up and skip the current iteration so that we can drain all pending chunks
1504                // before handling the next command.
1505            }
1506            Command::Shutdown => return false,
1507        }
1508
1509        true
1510    }
1511
1512    use crossbeam::select;
1513    loop {
1514        // NOTE: Always pop chunks first, this is what makes `Command::PopPendingChunks` possible,
1515        // which in turns makes `RecordingStream::flush_blocking` well defined.
1516        while let Ok(chunk) = chunks.try_recv() {
1517            let mut msg = match chunk.to_arrow_msg() {
1518                Ok(chunk) => chunk,
1519                Err(err) => {
1520                    re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1521                    continue;
1522                }
1523            };
1524            msg.on_release = on_release.clone();
1525            sink.send(LogMsg::ArrowMsg(store_info.store_id.clone(), msg));
1526        }
1527
1528        select! {
1529            recv(chunks) -> res => {
1530                let Ok(chunk) = res else {
1531                    // The batcher is gone, which can only happen if the `RecordingStream` itself
1532                    // has been dropped.
1533                    re_log::trace!("Shutting down forwarding_thread: batcher is gone");
1534                    break;
1535                };
1536
1537                let msg = match chunk.to_arrow_msg() {
1538                    Ok(chunk) => chunk,
1539                    Err(err) => {
1540                        re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1541                        continue;
1542                    }
1543                };
1544
1545                sink.send(LogMsg::ArrowMsg(store_info.store_id.clone(), msg));
1546            }
1547
1548            recv(cmds_rx) -> res => {
1549                let Ok(cmd) = res else {
1550                    // All command senders are gone, which can only happen if the
1551                    // `RecordingStream` itself has been dropped.
1552                    re_log::trace!("Shutting down forwarding_thread: all command senders are gone");
1553                    break;
1554                };
1555                if !handle_cmd(&store_info, cmd, &mut sink) {
1556                    break; // shutdown
1557                }
1558            }
1559        }
1560
1561        // NOTE: The receiving end of the command stream is owned solely by this thread.
1562        // Past this point, all command writes will return `ErrDisconnected`.
1563    }
1564}
1565
1566impl RecordingStream {
1567    /// Check if logging is enabled on this `RecordingStream`.
1568    ///
1569    /// If not, all recording calls will be ignored.
1570    #[inline]
1571    pub fn is_enabled(&self) -> bool {
1572        self.with(|_| true).unwrap_or(false)
1573    }
1574
1575    /// The [`StoreInfo`] associated with this `RecordingStream`.
1576    #[inline]
1577    pub fn store_info(&self) -> Option<StoreInfo> {
1578        self.with(|inner| inner.store_info.clone())
1579    }
1580
1581    /// Determine whether a fork has happened since creating this `RecordingStream`. In general, this means our
1582    /// batcher/sink threads are gone and all data logged since the fork has been dropped.
1583    ///
1584    /// It is essential that [`crate::cleanup_if_forked_child`] be called after forking the process. SDK-implementations
1585    /// should do this during their initialization phase.
1586    #[inline]
1587    pub fn is_forked_child(&self) -> bool {
1588        self.with(|inner| inner.is_forked_child()).unwrap_or(false)
1589    }
1590}
1591
1592impl RecordingStream {
1593    /// Records an arbitrary [`LogMsg`].
1594    #[inline]
1595    pub fn record_msg(&self, msg: LogMsg) {
1596        let f = move |inner: &RecordingStreamInner| {
1597            // NOTE: Internal channels can never be closed outside of the `Drop` impl, this send cannot
1598            // fail.
1599            inner.cmds_tx.send(Command::RecordMsg(msg)).ok();
1600            inner
1601                .tick
1602                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1603        };
1604
1605        if self.with(f).is_none() {
1606            re_log::warn_once!("Recording disabled - call to record_msg() ignored");
1607        }
1608    }
1609
1610    /// Records a single [`PendingRow`].
1611    ///
1612    /// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
1613    /// [`RecordingStream`]'s internal clock.
1614    ///
1615    /// Internally, incoming [`PendingRow`]s are automatically coalesced into larger [`Chunk`]s to
1616    /// optimize for transport.
1617    #[inline]
1618    pub fn record_row(&self, entity_path: EntityPath, mut row: PendingRow, inject_time: bool) {
1619        let f = move |inner: &RecordingStreamInner| {
1620            // NOTE: We're incrementing the current tick still.
1621            let tick = inner
1622                .tick
1623                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1624            if inject_time {
1625                // Get the current time on all timelines, for the current recording, on the current
1626                // thread…
1627                let mut now = self.now();
1628                // …and then also inject the current recording tick into it.
1629                now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1630
1631                // Inject all these times into the row, overriding conflicting times, if any.
1632                for (timeline, cell) in now {
1633                    row.timepoint.insert_cell(timeline, cell);
1634                }
1635            }
1636
1637            inner.batcher.push_row(entity_path, row);
1638        };
1639
1640        if self.with(f).is_none() {
1641            re_log::warn_once!("Recording disabled - call to record_row() ignored");
1642        }
1643    }
1644
1645    /// Logs a single [`Chunk`].
1646    ///
1647    /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1648    /// If you don't want to inject these, use [`Self::send_chunk`] instead.
1649    #[inline]
1650    pub fn log_chunk(&self, mut chunk: Chunk) {
1651        let f = move |inner: &RecordingStreamInner| {
1652            // TODO(cmc): Repeating these values is pretty wasteful. Would be nice to have a way of
1653            // indicating these are fixed across the whole chunk.
1654            // Inject the log time
1655            {
1656                let time_timeline = Timeline::log_time();
1657                let time =
1658                    TimeInt::new_temporal(re_log_types::Timestamp::now().nanos_since_epoch());
1659
1660                let repeated_time = std::iter::repeat_n(time.as_i64(), chunk.num_rows()).collect();
1661
1662                let time_column = TimeColumn::new(Some(true), time_timeline, repeated_time);
1663
1664                if let Err(err) = chunk.add_timeline(time_column) {
1665                    re_log::error!(
1666                        "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1667                        time_timeline.name(),
1668                        err
1669                    );
1670                    return;
1671                }
1672            }
1673            // Inject the log tick
1674            {
1675                let tick_timeline = Timeline::log_tick();
1676
1677                let tick = inner
1678                    .tick
1679                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1680
1681                let repeated_tick = std::iter::repeat_n(tick, chunk.num_rows()).collect();
1682
1683                let tick_chunk = TimeColumn::new(Some(true), tick_timeline, repeated_tick);
1684
1685                if let Err(err) = chunk.add_timeline(tick_chunk) {
1686                    re_log::error!(
1687                        "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1688                        tick_timeline.name(),
1689                        err
1690                    );
1691                    return;
1692                }
1693            }
1694
1695            inner.batcher.push_chunk(chunk);
1696        };
1697
1698        if self.with(f).is_none() {
1699            re_log::warn_once!("Recording disabled - call to log_chunk() ignored");
1700        }
1701    }
1702
1703    /// Logs multiple [`Chunk`]s.
1704    ///
1705    /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1706    /// for that use [`Self::log_chunks`].
1707    pub fn log_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1708        for chunk in chunks {
1709            self.log_chunk(chunk);
1710        }
1711    }
1712
1713    /// Records a single [`Chunk`].
1714    ///
1715    /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1716    /// If you don't want to inject these, use [`Self::send_chunks`] instead.
1717    #[inline]
1718    pub fn send_chunk(&self, chunk: Chunk) {
1719        let f = move |inner: &RecordingStreamInner| {
1720            inner.batcher.push_chunk(chunk);
1721        };
1722
1723        if self.with(f).is_none() {
1724            re_log::warn_once!("Recording disabled - call to send_chunk() ignored");
1725        }
1726    }
1727
1728    /// Records multiple [`Chunk`]s.
1729    ///
1730    /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1731    /// for that use [`Self::log_chunks`].
1732    pub fn send_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1733        for chunk in chunks {
1734            self.send_chunk(chunk);
1735        }
1736    }
1737
1738    /// Swaps the underlying sink for a new one.
1739    ///
1740    /// This guarantees that:
1741    /// 1. all pending rows and chunks are batched, collected and sent down the current sink,
1742    /// 2. the current sink is flushed if it has pending data in its buffers,
1743    /// 3. the current sink's backlog, if there's any, is forwarded to the new sink.
1744    ///
1745    /// When this function returns, the calling thread is guaranteed that all future record calls
1746    /// will end up in the new sink.
1747    ///
1748    /// If the batcher's configuration has not been set explicitly or by environment variables,
1749    /// this will change the batcher configuration to the sink's default configuration.
1750    ///
1751    /// ## Data loss
1752    ///
1753    /// If the current sink is in a broken state (e.g. a gRPC sink with a broken connection that
1754    /// cannot be repaired), all pending data in its buffers will be dropped.
1755    pub fn set_sink(&self, new_sink: Box<dyn LogSink>) {
1756        if self.is_forked_child() {
1757            re_log::error_once!(
1758                "Fork detected during set_sink. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1759            );
1760            return;
1761        }
1762
1763        let timeout = Duration::MAX; // The background thread should block forever if necessary
1764
1765        let f = move |inner: &RecordingStreamInner| {
1766            // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1767            // are safe.
1768
1769            // Flush the batcher down the chunk channel
1770            if let Err(err) = inner.batcher.flush_blocking(timeout) {
1771                re_log::warn!("Failed to flush batcher in `set_sink`: {err}");
1772            }
1773
1774            // Receive pending chunks from the batcher's channel
1775            inner.cmds_tx.send(Command::PopPendingChunks).ok();
1776
1777            // Update the batcher's configuration if it's sink-dependent.
1778            if inner.sink_dependent_batcher_config {
1779                let batcher_config = resolve_batcher_config(None, &*new_sink);
1780                inner.batcher.update_config(batcher_config);
1781            }
1782
1783            // Swap the sink, which will internally make sure to re-ingest the backlog if needed
1784            inner
1785                .cmds_tx
1786                .send(Command::SwapSink { new_sink, timeout })
1787                .ok();
1788
1789            // Before we give control back to the caller, we need to make sure that the swap has
1790            // taken place: we don't want the user to send data to the old sink!
1791            re_log::trace!("Waiting for sink swap to complete…");
1792            let (cmd, oneshot) = Command::flush(timeout);
1793            inner.cmds_tx.send(cmd).ok();
1794            oneshot.recv().ok();
1795            re_log::trace!("Sink swap completed.");
1796        };
1797
1798        if self.with(f).is_none() {
1799            re_log::warn_once!("Recording disabled - call to set_sink() ignored");
1800        }
1801    }
1802
1803    /// Initiates a flush of the pipeline and returns immediately.
1804    ///
1805    /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
1806    /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1807    ///
1808    /// This will never return [`SinkFlushError::Timeout`].
1809    pub fn flush_async(&self) -> Result<(), SinkFlushError> {
1810        re_tracing::profile_function!();
1811        match self.flush(None) {
1812            Err(SinkFlushError::Timeout) => Ok(()),
1813            result => result,
1814        }
1815    }
1816
1817    /// Flush the batching pipeline and waits for it to propagate.
1818    ///
1819    /// The function will block until either the flush has completed successfully (`Ok`),
1820    /// an error has occurred (`SinkFlushError::Failed`), or the timeout is reached (`SinkFlushError::Timeout`).
1821    ///
1822    /// Convenience for calling [`Self::flush_with_timeout`] with a timeout of [`Duration::MAX`]
1823    pub fn flush_blocking(&self) -> Result<(), SinkFlushError> {
1824        re_tracing::profile_function!();
1825        self.flush_with_timeout(Duration::MAX)
1826    }
1827
1828    /// Flush the batching pipeline and optionally waits for it to propagate.
1829    /// If you don't want a timeout you can pass in [`Duration::MAX`].
1830    ///
1831    /// The function will block until that timeout is reached,
1832    /// an error occurs, or the flush is complete.
1833    /// The function will only block while there is some hope of progress.
1834    /// For instance: if the underlying gRPC connection is disconnected (or never connected at all),
1835    /// then [`SinkFlushError::Failed`] is returned.
1836    ///
1837    /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1838    pub fn flush_with_timeout(&self, timeout: Duration) -> Result<(), SinkFlushError> {
1839        re_tracing::profile_function!();
1840        self.flush(Some(timeout))
1841    }
1842
1843    /// Flush the batching pipeline and optionally waits for it to propagate.
1844    ///
1845    /// If `timeout` is `None`, then this function will start the flush, but NOT wait for it to finish.
1846    ///
1847    /// If a `timeout` is given, then the function will block until that timeout is reached,
1848    /// an error occurs, or the flush is complete.
1849    ///
1850    /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1851    fn flush(&self, timeout: Option<Duration>) -> Result<(), SinkFlushError> {
1852        if self.is_forked_child() {
1853            return Err(SinkFlushError::failed(
1854                "Fork detected during flush. cleanup_if_forked() should always be called after forking. This is likely a bug in the Rerun SDK.",
1855            ));
1856        }
1857
1858        let f = move |inner: &RecordingStreamInner| -> Result<(), SinkFlushError> {
1859            // 0. Wait for all pending data loader threads to complete
1860            //
1861            // This ensures that data from `log_file_from_path` and `log_file_from_contents`
1862            // is fully loaded before we flush the batcher and sink.
1863            inner.wait_for_dataloaders();
1864
1865            // 1. Synchronously flush the batcher down the chunk channel
1866            //
1867            // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all chunks
1868            // are ready to be drained by the time this call returns.
1869            // It cannot block indefinitely and is fairly fast as it only requires compute (no I/O).
1870            inner
1871                .batcher
1872                .flush_blocking(Duration::MAX)
1873                .map_err(|err| match err {
1874                    BatcherFlushError::Closed => SinkFlushError::failed(err.to_string()),
1875                    BatcherFlushError::Timeout => SinkFlushError::Timeout,
1876                })?;
1877
1878            // 2. Drain all pending chunks from the batcher's channel _before_ any other future command
1879            inner
1880                .cmds_tx
1881                .send(Command::PopPendingChunks)
1882                .map_err(|_ignored| {
1883                    SinkFlushError::failed(
1884                        "Sink shut down prematurely. This is likely a bug in the Rerun SDK.",
1885                    )
1886                })?;
1887
1888            // 3. Asynchronously flush everything down the sink
1889            let (cmd, on_done) = Command::flush(Duration::MAX); // The background thread should block forever if necessary
1890            inner.cmds_tx.send(cmd).map_err(|_ignored| {
1891                SinkFlushError::failed(
1892                    "Sink shut down prematurely. This is likely a bug in the Rerun SDK.",
1893                )
1894            })?;
1895
1896            if let Some(timeout) = timeout {
1897                on_done.recv_timeout(timeout).map_err(|err| match err {
1898                    RecvTimeoutError::Timeout => SinkFlushError::Timeout,
1899                    RecvTimeoutError::Disconnected => SinkFlushError::failed(
1900                        "Flush never finished. This is likely a bug in the Rerun SDK.",
1901                    ),
1902                })??;
1903            }
1904
1905            Ok(())
1906        };
1907
1908        match self.with(f) {
1909            Some(Ok(())) => Ok(()),
1910            Some(Err(err)) => Err(err),
1911            None => {
1912                re_log::warn_once!("Recording disabled - call to flush ignored");
1913                Ok(())
1914            }
1915        }
1916    }
1917}
1918
1919impl RecordingStream {
1920    /// Stream data to multiple different sinks.
1921    ///
1922    /// This is semantically the same as calling [`RecordingStream::set_sink`], but the resulting
1923    /// [`RecordingStream`] will now stream data to multiple sinks at the same time.
1924    ///
1925    /// Currently only supports [`GrpcSink`][grpc_sink] and [`FileSink`][file_sink].
1926    ///
1927    /// If the batcher's configuration has not been set explicitly or by environment variables,
1928    /// This will take over a conservative default of the new sinks.
1929    /// (there's no guarantee on when exactly the new configuration will be active)
1930    ///
1931    /// [grpc_sink]: crate::sink::GrpcSink
1932    /// [file_sink]: crate::sink::FileSink
1933    pub fn set_sinks(&self, sinks: impl crate::log_sink::IntoMultiSink) {
1934        if forced_sink_path().is_some() {
1935            re_log::debug!("Ignored setting new MultiSink since {ENV_FORCE_SAVE} is set");
1936            return;
1937        }
1938
1939        let sink = sinks.into_multi_sink();
1940
1941        self.set_sink(Box::new(sink));
1942    }
1943
1944    /// Asynchronously calls a method that has read access to the currently active sink.
1945    ///
1946    /// Since a recording stream's sink is owned by a different thread there is no guarantee when
1947    /// the callback is going to be called.
1948    /// It's advised to return as quickly as possible from the callback since
1949    /// as long as the callback doesn't return, the sink will not receive any new data,
1950    ///
1951    /// # Experimental
1952    ///
1953    /// This is an experimental API and may change in future releases.
1954    // TODO(#10444): This should become a lot more straight forward with explicit sinks.
1955    pub fn inspect_sink(&self, f: impl FnOnce(&dyn LogSink) + Send + 'static) {
1956        self.with(|inner| inner.cmds_tx.send(Command::InspectSink(Box::new(f))).ok());
1957    }
1958
1959    /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
1960    /// the specified address.
1961    ///
1962    /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
1963    ///
1964    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1965    /// terms of data durability and ordering.
1966    /// See [`Self::set_sink`] for more information.
1967    pub fn connect_grpc(&self) -> RecordingStreamResult<()> {
1968        self.connect_grpc_opts(format!(
1969            "rerun+http://127.0.0.1:{}/proxy",
1970            re_grpc_server::DEFAULT_SERVER_PORT
1971        ))
1972    }
1973
1974    /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
1975    /// the specified address.
1976    ///
1977    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1978    /// terms of data durability and ordering.
1979    /// See [`Self::set_sink`] for more information.
1980    ///
1981    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
1982    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
1983    /// call to `flush` to block indefinitely if a connection cannot be established.
1984    pub fn connect_grpc_opts(&self, url: impl Into<String>) -> RecordingStreamResult<()> {
1985        if forced_sink_path().is_some() {
1986            re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
1987            return Ok(());
1988        }
1989
1990        let url: String = url.into();
1991        let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
1992            return Err(RecordingStreamError::NotAProxyEndpoint);
1993        };
1994
1995        let sink = crate::log_sink::GrpcSink::new(uri);
1996
1997        self.set_sink(Box::new(sink));
1998        Ok(())
1999    }
2000
2001    #[cfg(feature = "server")]
2002    /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
2003    /// `rerun+http://127.0.0.1:9876/proxy`.
2004    ///
2005    /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
2006    ///
2007    /// You can connect a viewer to it with `rerun --connect`.
2008    ///
2009    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
2010    /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
2011    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
2012    pub fn serve_grpc(
2013        &self,
2014        server_options: re_grpc_server::ServerOptions,
2015    ) -> RecordingStreamResult<()> {
2016        self.serve_grpc_opts("0.0.0.0", crate::DEFAULT_SERVER_PORT, server_options)
2017    }
2018
2019    #[cfg(feature = "server")]
2020    /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
2021    /// `rerun+http://{bind_ip}:{port}/proxy`.
2022    ///
2023    /// `0.0.0.0` is a good default for `bind_ip`.
2024    ///
2025    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
2026    /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
2027    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
2028    pub fn serve_grpc_opts(
2029        &self,
2030        bind_ip: impl AsRef<str>,
2031        port: u16,
2032        server_options: re_grpc_server::ServerOptions,
2033    ) -> RecordingStreamResult<()> {
2034        if forced_sink_path().is_some() {
2035            re_log::debug!("Ignored setting GrpcServerSink since {ENV_FORCE_SAVE} is set");
2036            return Ok(());
2037        }
2038
2039        let sink = crate::grpc_server::GrpcServerSink::new(bind_ip.as_ref(), port, server_options)?;
2040
2041        self.set_sink(Box::new(sink));
2042        Ok(())
2043    }
2044
2045    /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
2046    /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
2047    /// new process.
2048    ///
2049    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
2050    /// that viewer instead of starting a new one.
2051    ///
2052    /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
2053    /// as well as the underlying connection.
2054    ///
2055    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2056    /// terms of data durability and ordering.
2057    /// See [`Self::set_sink`] for more information.
2058    pub fn spawn(&self) -> RecordingStreamResult<()> {
2059        self.spawn_opts(&Default::default())
2060    }
2061
2062    /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
2063    /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
2064    /// new process.
2065    ///
2066    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
2067    /// that viewer instead of starting a new one.
2068    ///
2069    /// The behavior of the spawned Viewer can be configured via `opts`.
2070    /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
2071    ///
2072    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2073    /// terms of data durability and ordering.
2074    /// See [`Self::set_sink`] for more information.
2075    ///
2076    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
2077    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
2078    /// call to `flush` to block indefinitely if a connection cannot be established.
2079    pub fn spawn_opts(&self, opts: &crate::SpawnOptions) -> RecordingStreamResult<()> {
2080        if !self.is_enabled() {
2081            re_log::debug!("Rerun disabled - call to spawn() ignored");
2082            return Ok(());
2083        }
2084        if forced_sink_path().is_some() {
2085            re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
2086            return Ok(());
2087        }
2088
2089        crate::spawn(opts)?;
2090
2091        self.connect_grpc_opts(format!("rerun+http://{}/proxy", opts.connect_addr()))?;
2092
2093        Ok(())
2094    }
2095
2096    /// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
2097    /// [`MemorySinkStorage`].
2098    ///
2099    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2100    /// terms of data durability and ordering.
2101    /// See [`Self::set_sink`] for more information.
2102    pub fn memory(&self) -> MemorySinkStorage {
2103        let sink = crate::sink::MemorySink::new(self.clone());
2104        let storage = sink.buffer();
2105        self.set_sink(Box::new(sink));
2106        storage
2107    }
2108
2109    /// Swaps the underlying sink for a [`crate::sink::BinaryStreamSink`] sink and returns the associated
2110    /// [`BinaryStreamStorage`].
2111    ///
2112    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2113    /// terms of data durability and ordering.
2114    /// See [`Self::set_sink`] for more information.
2115    pub fn binary_stream(&self) -> BinaryStreamStorage {
2116        let (sink, storage) = crate::sink::BinaryStreamSink::new(self.clone());
2117        self.set_sink(Box::new(sink));
2118        storage
2119    }
2120
2121    /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2122    ///
2123    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2124    /// terms of data durability and ordering.
2125    /// See [`Self::set_sink`] for more information.
2126    pub fn save(
2127        &self,
2128        path: impl Into<std::path::PathBuf>,
2129    ) -> Result<(), crate::sink::FileSinkError> {
2130        self.save_opts(path)
2131    }
2132
2133    /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2134    ///
2135    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2136    /// terms of data durability and ordering.
2137    /// See [`Self::set_sink`] for more information.
2138    ///
2139    /// If a blueprint was provided, it will be stored first in the file.
2140    /// Blueprints are currently an experimental part of the Rust SDK.
2141    pub fn save_opts(
2142        &self,
2143        path: impl Into<std::path::PathBuf>,
2144    ) -> Result<(), crate::sink::FileSinkError> {
2145        if forced_sink_path().is_some() {
2146            re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2147            return Ok(());
2148        }
2149
2150        let sink = crate::sink::FileSink::new(path)?;
2151
2152        self.set_sink(Box::new(sink));
2153
2154        Ok(())
2155    }
2156
2157    /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2158    ///
2159    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2160    /// default back to `buffered` mode, in order not to break the user's terminal.
2161    ///
2162    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2163    /// terms of data durability and ordering.
2164    /// See [`Self::set_sink`] for more information.
2165    pub fn stdout(&self) -> Result<(), crate::sink::FileSinkError> {
2166        self.stdout_opts()
2167    }
2168
2169    /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2170    ///
2171    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2172    /// default back to `buffered` mode, in order not to break the user's terminal.
2173    ///
2174    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2175    /// terms of data durability and ordering.
2176    /// See [`Self::set_sink`] for more information.
2177    ///
2178    /// If a blueprint was provided, it will be stored first in the file.
2179    /// Blueprints are currently an experimental part of the Rust SDK.
2180    pub fn stdout_opts(&self) -> Result<(), crate::sink::FileSinkError> {
2181        if forced_sink_path().is_some() {
2182            re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2183            return Ok(());
2184        }
2185
2186        if std::io::stdout().is_terminal() {
2187            re_log::debug!("Ignored call to stdout() because stdout is a terminal");
2188            self.set_sink(Box::new(crate::log_sink::BufferedSink::new()));
2189            return Ok(());
2190        }
2191
2192        let sink = crate::sink::FileSink::stdout()?;
2193
2194        self.set_sink(Box::new(sink));
2195
2196        Ok(())
2197    }
2198
2199    /// Swaps the underlying sink for a [`crate::sink::BufferedSink`].
2200    ///
2201    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2202    /// terms of data durability and ordering.
2203    /// See [`Self::set_sink`] for more information.
2204    pub fn disconnect(&self) {
2205        let f = move |inner: &RecordingStreamInner| {
2206            // When disconnecting, we need to make sure that pending top-level `DataLoader` threads that
2207            // were started from the SDK run to completion.
2208            inner.wait_for_dataloaders();
2209            self.set_sink(Box::new(crate::sink::BufferedSink::new()));
2210        };
2211
2212        if self.with(f).is_none() {
2213            re_log::warn_once!("Recording disabled - call to disconnect() ignored");
2214        }
2215    }
2216
2217    /// Send a blueprint through this recording stream
2218    pub fn send_blueprint(
2219        &self,
2220        blueprint: Vec<LogMsg>,
2221        activation_cmd: BlueprintActivationCommand,
2222    ) {
2223        let mut blueprint_id = None;
2224        for msg in blueprint {
2225            if blueprint_id.is_none() {
2226                blueprint_id = Some(msg.store_id().clone());
2227            }
2228            self.record_msg(msg);
2229        }
2230
2231        if let Some(blueprint_id) = blueprint_id {
2232            if blueprint_id == activation_cmd.blueprint_id {
2233                // Let the viewer know that the blueprint has been fully received,
2234                // and that it can now be activated.
2235                // We don't want to activate half-loaded blueprints, because that can be confusing,
2236                // and can also lead to problems with view heuristics.
2237                self.record_msg(activation_cmd.into());
2238            } else {
2239                re_log::warn!(
2240                    "Blueprint ID mismatch when sending blueprint: {:?} != {:?}. Ignoring activation.",
2241                    blueprint_id,
2242                    activation_cmd.blueprint_id
2243                );
2244            }
2245        }
2246    }
2247}
2248
2249impl fmt::Debug for RecordingStream {
2250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2251        let with = |inner: &RecordingStreamInner| {
2252            let RecordingStreamInner {
2253                // This pattern match prevents _accidentally_ omitting data from the debug output
2254                // when new fields are added.
2255                store_info,
2256                recording_info,
2257                tick,
2258                cmds_tx: _,
2259                batcher: _,
2260                batcher_to_sink_handle: _,
2261                sink_dependent_batcher_config,
2262                dataloader_handles,
2263                pid_at_creation,
2264            } = inner;
2265
2266            f.debug_struct("RecordingStream")
2267                .field("store_info", &store_info)
2268                .field("recording_info", &recording_info)
2269                .field("tick", &tick)
2270                .field(
2271                    "sink_dependent_batcher_config",
2272                    &sink_dependent_batcher_config,
2273                )
2274                .field("pending_dataloaders", &dataloader_handles.lock().len())
2275                .field("pid_at_creation", &pid_at_creation)
2276                .finish_non_exhaustive()
2277        };
2278
2279        match self.with(with) {
2280            Some(res) => res,
2281            None => write!(f, "RecordingStream {{ disabled }}"),
2282        }
2283    }
2284}
2285
2286// --- Stateful time ---
2287
2288/// Thread-local data.
2289#[derive(Default)]
2290struct ThreadInfo {
2291    /// The current time per-thread per-recording, which can be set by users.
2292    timepoints: HashMap<StoreId, TimePoint>,
2293}
2294
2295impl ThreadInfo {
2296    fn thread_now(rid: &StoreId) -> TimePoint {
2297        Self::with(|ti| ti.now(rid))
2298    }
2299
2300    fn set_thread_time(rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2301        Self::with(|ti| ti.set_time(rid, timeline, cell));
2302    }
2303
2304    fn unset_thread_time(rid: &StoreId, timeline: &TimelineName) {
2305        Self::with(|ti| ti.unset_time(rid, timeline));
2306    }
2307
2308    fn reset_thread_time(rid: &StoreId) {
2309        Self::with(|ti| ti.reset_time(rid));
2310    }
2311
2312    /// Get access to the thread-local [`ThreadInfo`].
2313    fn with<R>(f: impl FnOnce(&mut Self) -> R) -> R {
2314        use std::cell::RefCell;
2315        thread_local! {
2316            static THREAD_INFO: RefCell<Option<ThreadInfo>> = const { RefCell::new(None) };
2317        }
2318
2319        THREAD_INFO.with(|thread_info| {
2320            let mut thread_info = thread_info.borrow_mut();
2321            let thread_info = thread_info.get_or_insert_with(Self::default);
2322            f(thread_info)
2323        })
2324    }
2325
2326    fn now(&self, rid: &StoreId) -> TimePoint {
2327        let mut timepoint = self.timepoints.get(rid).cloned().unwrap_or_default();
2328        timepoint.insert_cell(TimelineName::log_time(), TimeCell::timestamp_now());
2329        timepoint
2330    }
2331
2332    fn set_time(&mut self, rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2333        self.timepoints
2334            .entry(rid.clone())
2335            .or_default()
2336            .insert_cell(timeline, cell);
2337    }
2338
2339    fn unset_time(&mut self, rid: &StoreId, timeline: &TimelineName) {
2340        if let Some(timepoint) = self.timepoints.get_mut(rid) {
2341            timepoint.remove(timeline);
2342        }
2343    }
2344
2345    fn reset_time(&mut self, rid: &StoreId) {
2346        if let Some(timepoint) = self.timepoints.get_mut(rid) {
2347            *timepoint = TimePoint::default();
2348        }
2349    }
2350}
2351
2352impl RecordingStream {
2353    /// Returns the current time of the recording on the current thread.
2354    pub fn now(&self) -> TimePoint {
2355        let f =
2356            move |inner: &RecordingStreamInner| ThreadInfo::thread_now(&inner.store_info.store_id);
2357        if let Some(res) = self.with(f) {
2358            res
2359        } else {
2360            re_log::warn_once!("Recording disabled - call to now() ignored");
2361            TimePoint::default()
2362        }
2363    }
2364
2365    /// Set the current time of the recording, for the current calling thread.
2366    ///
2367    /// Used for all subsequent logging performed from this same thread, until the next call
2368    /// to one of the index/time setting methods.
2369    ///
2370    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2371    ///
2372    /// See also:
2373    /// - [`Self::set_time`]
2374    /// - [`Self::set_time_sequence`]
2375    /// - [`Self::set_duration_secs`]
2376    /// - [`Self::disable_timeline`]
2377    /// - [`Self::reset_time`]
2378    pub fn set_timepoint(&self, timepoint: impl Into<TimePoint>) {
2379        let f = move |inner: &RecordingStreamInner| {
2380            let timepoint = timepoint.into();
2381            for (timeline, time) in timepoint {
2382                ThreadInfo::set_thread_time(&inner.store_info.store_id, timeline, time);
2383            }
2384        };
2385
2386        if self.with(f).is_none() {
2387            re_log::warn_once!("Recording disabled - call to set_timepoint() ignored");
2388        }
2389    }
2390
2391    /// Set the current value of one of the timelines.
2392    ///
2393    /// Used for all subsequent logging performed from this same thread, until the next call
2394    /// to one of the index/time setting methods.
2395    ///
2396    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2397    ///
2398    /// Example:
2399    /// ```no_run
2400    /// # mod rerun { pub use re_sdk::*; }
2401    /// # let rec: rerun::RecordingStream = unimplemented!();
2402    /// rec.set_time("frame_nr", rerun::TimeCell::from_sequence(42));
2403    /// rec.set_time("duration", std::time::Duration::from_millis(123));
2404    /// rec.set_time("capture_time", std::time::SystemTime::now());
2405    /// ```
2406    ///
2407    /// See also:
2408    /// - [`Self::set_timepoint`]
2409    /// - [`Self::set_time_sequence`]
2410    /// - [`Self::set_duration_secs`]
2411    /// - [`Self::disable_timeline`]
2412    /// - [`Self::reset_time`]
2413    pub fn set_time(&self, timeline: impl Into<TimelineName>, value: impl TryInto<TimeCell>) {
2414        let f = move |inner: &RecordingStreamInner| {
2415            let timeline = timeline.into();
2416            if let Ok(value) = value.try_into() {
2417                ThreadInfo::set_thread_time(&inner.store_info.store_id, timeline, value);
2418            } else {
2419                re_log::warn_once!(
2420                    "set_time({timeline}): Failed to convert the given value to an TimeCell"
2421                );
2422            }
2423        };
2424
2425        if self.with(f).is_none() {
2426            re_log::warn_once!("Recording disabled - call to set_time() ignored");
2427        }
2428    }
2429
2430    /// Set the current time of the recording, for the current calling thread.
2431    ///
2432    /// Short for `set_time(timeline, rerun::TimeCell::from_sequence(sequence))`.
2433    ///
2434    /// Used for all subsequent logging performed from this same thread, until the next call
2435    /// to one of the index/time setting methods.
2436    ///
2437    /// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
2438    /// You can remove a timeline again using `rec.disable_timeline("frame_nr")`.
2439    ///
2440    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2441    ///
2442    /// See also:
2443    /// - [`Self::set_time`]
2444    /// - [`Self::set_timepoint`]
2445    /// - [`Self::set_duration_secs`]
2446    /// - [`Self::disable_timeline`]
2447    /// - [`Self::reset_time`]
2448    #[inline]
2449    pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: impl Into<i64>) {
2450        self.set_time(timeline, TimeCell::from_sequence(sequence.into()));
2451    }
2452
2453    /// Set the current time of the recording, for the current calling thread.
2454    ///
2455    /// Short for `set_time(timeline, std::time::Duration::from_secs_f64(secs))`..
2456    ///
2457    /// Used for all subsequent logging performed from this same thread, until the next call
2458    /// to one of the index/time setting methods.
2459    ///
2460    /// For example: `rec.set_duration_secs("time_since_start", time_offset)`.
2461    /// You can remove a timeline again using `rec.disable_timeline("time_since_start")`.
2462    ///
2463    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2464    ///
2465    /// See also:
2466    /// - [`Self::set_time`]
2467    /// - [`Self::set_timepoint`]
2468    /// - [`Self::set_timestamp_secs_since_epoch`]
2469    /// - [`Self::set_time_sequence`]
2470    /// - [`Self::disable_timeline`]
2471    /// - [`Self::reset_time`]
2472    #[inline]
2473    pub fn set_duration_secs(&self, timeline: impl Into<TimelineName>, secs: impl Into<f64>) {
2474        let secs = secs.into();
2475        if let Ok(duration) = std::time::Duration::try_from_secs_f64(secs) {
2476            self.set_time(timeline, duration);
2477        } else {
2478            re_log::error_once!("set_duration_secs: can't set time to {secs}");
2479        }
2480    }
2481
2482    /// Set a timestamp as seconds since Unix epoch (1970-01-01 00:00:00 UTC).
2483    ///
2484    /// Short for `self.set_time(timeline, rerun::TimeCell::from_timestamp_secs_since_epoch(secs))`.
2485    ///
2486    /// Used for all subsequent logging performed from this same thread, until the next call
2487    /// to one of the index/time setting methods.
2488    ///
2489    /// You can remove a timeline again using `rec.disable_timeline(timeline)`.
2490    ///
2491    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2492    ///
2493    /// See also:
2494    /// - [`Self::set_time`]
2495    /// - [`Self::set_timepoint`]
2496    /// - [`Self::set_duration_secs`]
2497    /// - [`Self::set_time_sequence`]
2498    /// - [`Self::set_timestamp_nanos_since_epoch`]
2499    /// - [`Self::disable_timeline`]
2500    /// - [`Self::reset_time`]
2501    #[inline]
2502    pub fn set_timestamp_secs_since_epoch(
2503        &self,
2504        timeline: impl Into<TimelineName>,
2505        secs: impl Into<f64>,
2506    ) {
2507        self.set_time(
2508            timeline,
2509            TimeCell::from_timestamp_secs_since_epoch(secs.into()),
2510        );
2511    }
2512
2513    /// Set a timestamp as nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
2514    ///
2515    /// Short for `self.set_time(timeline, rerun::TimeCell::set_timestamp_nanos_since_epoch(secs))`.
2516    ///
2517    /// Used for all subsequent logging performed from this same thread, until the next call
2518    /// to one of the index/time setting methods.
2519    ///
2520    /// You can remove a timeline again using `rec.disable_timeline(timeline)`.
2521    ///
2522    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2523    ///
2524    /// See also:
2525    /// - [`Self::set_time`]
2526    /// - [`Self::set_timepoint`]
2527    /// - [`Self::set_duration_secs`]
2528    /// - [`Self::set_time_sequence`]
2529    /// - [`Self::set_timestamp_secs_since_epoch`]
2530    /// - [`Self::disable_timeline`]
2531    /// - [`Self::reset_time`]
2532    #[inline]
2533    pub fn set_timestamp_nanos_since_epoch(
2534        &self,
2535        timeline: impl Into<TimelineName>,
2536        nanos: impl Into<i64>,
2537    ) {
2538        self.set_time(
2539            timeline,
2540            TimeCell::from_timestamp_nanos_since_epoch(nanos.into()),
2541        );
2542    }
2543
2544    /// Clears out the current time of the recording for the specified timeline, for the
2545    /// current calling thread.
2546    ///
2547    /// For example: `rec.disable_timeline("frame")`, `rec.disable_timeline("sim_time")`.
2548    ///
2549    /// See also:
2550    /// - [`Self::set_timepoint`]
2551    /// - [`Self::set_time_sequence`]
2552    /// - [`Self::reset_time`]
2553    pub fn disable_timeline(&self, timeline: impl Into<TimelineName>) {
2554        let f = move |inner: &RecordingStreamInner| {
2555            let timeline = timeline.into();
2556            ThreadInfo::unset_thread_time(&inner.store_info.store_id, &timeline);
2557        };
2558
2559        if self.with(f).is_none() {
2560            re_log::warn_once!("Recording disabled - call to disable_timeline() ignored");
2561        }
2562    }
2563
2564    /// Clears out the current time of the recording, for the current calling thread.
2565    ///
2566    /// Used for all subsequent logging performed from this same thread, until the next call
2567    /// to one of the index/time setting methods.
2568    ///
2569    /// For example: `rec.reset_time()`.
2570    ///
2571    /// See also:
2572    /// - [`Self::set_timepoint`]
2573    /// - [`Self::set_time_sequence`]
2574    /// - [`Self::disable_timeline`]
2575    pub fn reset_time(&self) {
2576        let f = move |inner: &RecordingStreamInner| {
2577            ThreadInfo::reset_thread_time(&inner.store_info.store_id);
2578        };
2579
2580        if self.with(f).is_none() {
2581            re_log::warn_once!("Recording disabled - call to reset_time() ignored");
2582        }
2583    }
2584}
2585
2586// ---
2587
2588#[cfg(test)]
2589mod tests {
2590    use insta::assert_debug_snapshot;
2591    use itertools::Itertools as _;
2592    use re_log_types::example_components::{MyLabel, MyPoints};
2593    use re_sdk_types::SerializedComponentBatch;
2594
2595    use super::*;
2596
2597    struct DisplayDescrs(Chunk);
2598
2599    impl std::fmt::Debug for DisplayDescrs {
2600        #[inline]
2601        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2602            f.debug_list()
2603                .entries(
2604                    self.0
2605                        .component_descriptors()
2606                        .map(|d| d.display_name().to_owned())
2607                        .sorted(),
2608                )
2609                .finish()
2610        }
2611    }
2612
2613    #[test]
2614    fn impl_send_sync() {
2615        fn assert_send_sync<T: Send + Sync>() {}
2616        assert_send_sync::<RecordingStream>();
2617    }
2618
2619    #[test]
2620    fn never_flush() {
2621        let rec = RecordingStreamBuilder::new("rerun_example_never_flush")
2622            .enabled(true)
2623            .batcher_config(ChunkBatcherConfig::NEVER)
2624            .buffered()
2625            .unwrap();
2626
2627        let store_info = rec.store_info().unwrap();
2628
2629        let rows = example_rows(false);
2630        for row in rows.clone() {
2631            rec.record_row("a".into(), row, false);
2632        }
2633
2634        let storage = rec.memory();
2635        let mut msgs = {
2636            let mut msgs = storage.take();
2637            msgs.reverse();
2638            msgs
2639        };
2640
2641        // First message should be a set_store_info resulting from the original sink swap to
2642        // buffered mode.
2643        match msgs.pop().unwrap() {
2644            LogMsg::SetStoreInfo(msg) => {
2645                assert!(msg.row_id != *RowId::ZERO);
2646                similar_asserts::assert_eq!(store_info, msg.info);
2647            }
2648            _ => panic!("expected SetStoreInfo"),
2649        }
2650
2651        // Second message should be a set_store_info resulting from the later sink swap from
2652        // buffered mode into in-memory mode.
2653        // This arrives _before_ the data itself since we're using manual flushing.
2654        match msgs.pop().unwrap() {
2655            LogMsg::SetStoreInfo(msg) => {
2656                assert!(msg.row_id != *RowId::ZERO);
2657                similar_asserts::assert_eq!(store_info, msg.info);
2658            }
2659            _ => panic!("expected SetStoreInfo"),
2660        }
2661
2662        // The following flushes were sent as a result of the implicit flush when swapping the
2663        // underlying sink from buffered to in-memory.
2664
2665        // Chunk that contains the `RecordingInfo`.
2666        match msgs.pop().unwrap() {
2667            LogMsg::ArrowMsg(rid, msg) => {
2668                assert_eq!(store_info.store_id, rid);
2669
2670                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2671
2672                chunk.sanity_check().unwrap();
2673
2674                assert_debug_snapshot!(DisplayDescrs(chunk));
2675            }
2676            _ => panic!("expected ArrowMsg"),
2677        }
2678
2679        // Final message is the batched chunk itself.
2680        match msgs.pop().unwrap() {
2681            LogMsg::ArrowMsg(rid, msg) => {
2682                assert_eq!(store_info.store_id, rid);
2683
2684                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2685
2686                chunk.sanity_check().unwrap();
2687
2688                assert_debug_snapshot!(DisplayDescrs(chunk));
2689            }
2690            _ => panic!("expected ArrowMsg"),
2691        }
2692
2693        // That's all.
2694        assert!(msgs.pop().is_none());
2695    }
2696
2697    #[test]
2698    fn always_flush() {
2699        let rec = RecordingStreamBuilder::new("rerun_example_always_flush")
2700            .enabled(true)
2701            .batcher_config(ChunkBatcherConfig::ALWAYS)
2702            .buffered()
2703            .unwrap();
2704
2705        let store_info = rec.store_info().unwrap();
2706
2707        let rows = example_rows(false);
2708        for row in rows.clone() {
2709            rec.record_row("a".into(), row, false);
2710        }
2711
2712        let storage = rec.memory();
2713        let mut msgs = {
2714            let mut msgs = storage.take();
2715            msgs.reverse();
2716            msgs
2717        };
2718
2719        // First message should be a set_store_info resulting from the original sink swap to
2720        // buffered mode.
2721        match msgs.pop().unwrap() {
2722            LogMsg::SetStoreInfo(msg) => {
2723                assert!(msg.row_id != *RowId::ZERO);
2724                similar_asserts::assert_eq!(store_info, msg.info);
2725            }
2726            _ => panic!("expected SetStoreInfo"),
2727        }
2728
2729        // Second message should be a set_store_info resulting from the later sink swap from
2730        // buffered mode into in-memory mode.
2731        // This arrives _before_ the data itself since we're using manual flushing.
2732        match msgs.pop().unwrap() {
2733            LogMsg::SetStoreInfo(msg) => {
2734                assert!(msg.row_id != *RowId::ZERO);
2735                similar_asserts::assert_eq!(store_info, msg.info);
2736            }
2737            _ => panic!("expected SetStoreInfo"),
2738        }
2739
2740        let mut assert_next_row = || match msgs.pop().unwrap() {
2741            LogMsg::ArrowMsg(rid, msg) => {
2742                assert_eq!(store_info.store_id, rid);
2743
2744                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2745
2746                chunk.sanity_check().unwrap();
2747
2748                assert_debug_snapshot!(DisplayDescrs(chunk));
2749            }
2750            _ => panic!("expected ArrowMsg"),
2751        };
2752
2753        // 3rd, 4th, 5th, and 6th messages are all the single-row batched chunks themselves,
2754        // which were sent as a result of the implicit flush when swapping the underlying sink
2755        // from buffered to in-memory. Note that these messages contain the 2 recording property
2756        // chunks.
2757        assert_next_row(); // Contains `RecordingInfo`
2758        assert_next_row();
2759        assert_next_row();
2760        assert_next_row();
2761
2762        // That's all.
2763        assert!(msgs.pop().is_none());
2764    }
2765
2766    #[test]
2767    fn flush_hierarchy() {
2768        let (rec, storage) = RecordingStreamBuilder::new("rerun_example_flush_hierarchy")
2769            .enabled(true)
2770            .batcher_config(ChunkBatcherConfig::NEVER)
2771            .memory()
2772            .unwrap();
2773
2774        let store_info = rec.store_info().unwrap();
2775
2776        let rows = example_rows(false);
2777        for row in rows.clone() {
2778            rec.record_row("a".into(), row, false);
2779        }
2780
2781        {
2782            let mut msgs = {
2783                let mut msgs = storage.take();
2784                msgs.reverse();
2785                msgs
2786            };
2787
2788            // First message should be a set_store_info resulting from the original sink swap
2789            // to in-memory mode.
2790            match msgs.pop().unwrap() {
2791                LogMsg::SetStoreInfo(msg) => {
2792                    assert!(msg.row_id != *RowId::ZERO);
2793                    similar_asserts::assert_eq!(store_info, msg.info);
2794                }
2795                _ => panic!("expected SetStoreInfo"),
2796            }
2797
2798            // For reasons, MemorySink ends up with 2 StoreInfos.
2799            // TODO(jleibs): Avoid a redundant StoreInfo message.
2800            match msgs.pop().unwrap() {
2801                LogMsg::SetStoreInfo(msg) => {
2802                    assert!(msg.row_id != *RowId::ZERO);
2803                    similar_asserts::assert_eq!(store_info, msg.info);
2804                }
2805                _ => panic!("expected SetStoreInfo"),
2806            }
2807
2808            // MemorySinkStorage transparently handles flushing during `take()`!
2809
2810            // The batch that contains the `RecordingInfo`.
2811            match msgs.pop().unwrap() {
2812                LogMsg::ArrowMsg(rid, msg) => {
2813                    assert_eq!(store_info.store_id, rid);
2814
2815                    let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2816
2817                    chunk.sanity_check().unwrap();
2818
2819                    assert_debug_snapshot!(DisplayDescrs(chunk));
2820                }
2821                _ => panic!("expected ArrowMsg"),
2822            }
2823
2824            // The batched chunk itself, which was sent as a result of the explicit flush above.
2825            match msgs.pop().unwrap() {
2826                LogMsg::ArrowMsg(rid, msg) => {
2827                    assert_eq!(store_info.store_id, rid);
2828
2829                    let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2830
2831                    chunk.sanity_check().unwrap();
2832
2833                    assert_debug_snapshot!(DisplayDescrs(chunk));
2834                }
2835                _ => panic!("expected ArrowMsg"),
2836            }
2837
2838            // That's all.
2839            assert!(msgs.pop().is_none());
2840        }
2841    }
2842
2843    #[test]
2844    fn disabled() {
2845        let (rec, storage) = RecordingStreamBuilder::new("rerun_example_disabled")
2846            .enabled(false)
2847            .batcher_config(ChunkBatcherConfig::ALWAYS)
2848            .memory()
2849            .unwrap();
2850
2851        assert_eq!(rec.ref_count(), 0);
2852
2853        let rows = example_rows(false);
2854        for row in rows.clone() {
2855            rec.record_row("a".into(), row, false);
2856        }
2857
2858        let mut msgs = {
2859            let mut msgs = storage.take();
2860            msgs.reverse();
2861            msgs
2862        };
2863
2864        // That's all.
2865        assert!(msgs.pop().is_none());
2866    }
2867
2868    #[test]
2869    fn test_set_thread_local() {
2870        // Regression-test for https://github.com/rerun-io/rerun/issues/2889
2871        std::thread::Builder::new()
2872            .name("test_thead".to_owned())
2873            .spawn(|| {
2874                let stream = RecordingStreamBuilder::new("rerun_example_test")
2875                    .buffered()
2876                    .unwrap();
2877                RecordingStream::set_thread_local(StoreKind::Recording, Some(stream));
2878            })
2879            .unwrap()
2880            .join()
2881            .unwrap();
2882    }
2883
2884    fn example_rows(static_: bool) -> Vec<PendingRow> {
2885        use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
2886        use re_sdk_types::Loggable;
2887
2888        let mut tick = 0i64;
2889        let mut timepoint = |frame_nr: i64| {
2890            let mut tp = TimePoint::default();
2891            if !static_ {
2892                tp.insert(Timeline::log_time(), re_log_types::Timestamp::now());
2893                tp.insert(Timeline::log_tick(), tick);
2894                tp.insert(Timeline::new_sequence("frame_nr"), frame_nr);
2895            }
2896            tick += 1;
2897            tp
2898        };
2899
2900        let row0 = {
2901            PendingRow {
2902                row_id: RowId::new(),
2903                timepoint: timepoint(1),
2904                components: [
2905                    (
2906                        MyPoints::descriptor_points().component,
2907                        SerializedComponentBatch::new(
2908                            <MyPoint as Loggable>::to_arrow([
2909                                MyPoint::new(10.0, 10.0),
2910                                MyPoint::new(20.0, 20.0),
2911                            ])
2912                            .unwrap(),
2913                            MyPoints::descriptor_points(),
2914                        ),
2915                    ), //
2916                    (
2917                        MyPoints::descriptor_colors().component,
2918                        SerializedComponentBatch::new(
2919                            <MyColor as Loggable>::to_arrow([MyColor(0x8080_80FF)]).unwrap(),
2920                            MyPoints::descriptor_colors(),
2921                        ),
2922                    ), //
2923                    (
2924                        MyPoints::descriptor_labels().component,
2925                        SerializedComponentBatch::new(
2926                            <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
2927                            MyPoints::descriptor_labels(),
2928                        ),
2929                    ), //
2930                ]
2931                .into_iter()
2932                .collect(),
2933            }
2934        };
2935
2936        let row1 = {
2937            PendingRow {
2938                row_id: RowId::new(),
2939                timepoint: timepoint(1),
2940                components: [
2941                    (
2942                        MyPoints::descriptor_points().component,
2943                        SerializedComponentBatch::new(
2944                            <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
2945                            MyPoints::descriptor_points(),
2946                        ),
2947                    ), //
2948                    (
2949                        MyPoints::descriptor_colors().component,
2950                        SerializedComponentBatch::new(
2951                            <MyColor as Loggable>::to_arrow([] as [MyColor; 0]).unwrap(),
2952                            MyPoints::descriptor_colors(),
2953                        ),
2954                    ), //
2955                    (
2956                        MyPoints::descriptor_labels().component,
2957                        SerializedComponentBatch::new(
2958                            <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
2959                            MyPoints::descriptor_labels(),
2960                        ),
2961                    ), //
2962                ]
2963                .into_iter()
2964                .collect(),
2965            }
2966        };
2967
2968        let row2 = {
2969            PendingRow {
2970                row_id: RowId::new(),
2971                timepoint: timepoint(1),
2972                components: [
2973                    (
2974                        MyPoints::descriptor_points().component,
2975                        SerializedComponentBatch::new(
2976                            <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
2977                            MyPoints::descriptor_points(),
2978                        ),
2979                    ), //
2980                    (
2981                        MyPoints::descriptor_colors().component,
2982                        SerializedComponentBatch::new(
2983                            <MyColor as Loggable>::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(),
2984                            MyPoints::descriptor_colors(),
2985                        ),
2986                    ), //
2987                    (
2988                        MyPoints::descriptor_labels().component,
2989                        SerializedComponentBatch::new(
2990                            <MyLabel as Loggable>::to_arrow([MyLabel("hey".into())]).unwrap(),
2991                            MyPoints::descriptor_labels(),
2992                        ),
2993                    ), //
2994                ]
2995                .into_iter()
2996                .collect(),
2997            }
2998        };
2999
3000        vec![row0, row1, row2]
3001    }
3002
3003    // See <https://github.com/rerun-io/rerun/pull/8587> for context.
3004    #[test]
3005    fn allows_componentbatch_unsized() {
3006        let labels = [
3007            MyLabel("a".into()),
3008            MyLabel("b".into()),
3009            MyLabel("c".into()),
3010        ];
3011
3012        let (rec, _mem) = RecordingStreamBuilder::new("rerun_example_test_componentbatch_unsized")
3013            .default_enabled(false)
3014            .enabled(false)
3015            .memory()
3016            .unwrap();
3017
3018        // This call used to *not* compile due to a lack of `?Sized` bounds.
3019        use re_sdk_types::ComponentBatch as _;
3020        rec.log(
3021            "labels",
3022            &labels
3023                .try_serialized(MyPoints::descriptor_labels())
3024                .unwrap(),
3025        )
3026        .unwrap();
3027    }
3028
3029    struct BatcherConfigTestSink {
3030        config: ChunkBatcherConfig,
3031    }
3032
3033    impl LogSink for BatcherConfigTestSink {
3034        fn default_batcher_config(&self) -> ChunkBatcherConfig {
3035            self.config.clone()
3036        }
3037
3038        fn send(&self, _msg: LogMsg) {
3039            // noop
3040        }
3041
3042        fn flush_blocking(&self, _timeout: Duration) -> Result<(), SinkFlushError> {
3043            Ok(())
3044        }
3045
3046        fn as_any(&self) -> &dyn std::any::Any {
3047            self
3048        }
3049    }
3050
3051    struct ScopedEnvVarSet {
3052        key: &'static str,
3053    }
3054
3055    impl ScopedEnvVarSet {
3056        #[expect(unsafe_code)]
3057        fn new(key: &'static str, value: &'static str) -> Self {
3058            // SAFETY: only used in tests.
3059            unsafe { std::env::set_var(key, value) };
3060            Self { key }
3061        }
3062    }
3063
3064    impl Drop for ScopedEnvVarSet {
3065        #[expect(unsafe_code)]
3066        fn drop(&mut self) {
3067            // SAFETY: only used in tests.
3068            unsafe {
3069                std::env::remove_var(self.key);
3070            }
3071        }
3072    }
3073
3074    const CONFIG_CHANGE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
3075
3076    fn clear_environment() {
3077        // SAFETY: only used in tests.
3078        #[expect(unsafe_code)]
3079        unsafe {
3080            std::env::remove_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED");
3081            std::env::remove_var("RERUN_FLUSH_NUM_BYTES");
3082            std::env::remove_var("RERUN_FLUSH_NUM_ROWS");
3083            std::env::remove_var("RERUN_FLUSH_TICK_SECS");
3084            std::env::remove_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED");
3085        }
3086    }
3087
3088    #[test]
3089    fn test_sink_dependent_batcher_config() {
3090        clear_environment();
3091
3092        let (tx, rx) = std::sync::mpsc::channel();
3093
3094        let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3095            .batcher_hooks(BatcherHooks {
3096                on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3097                    tx.send(config.clone()).unwrap();
3098                })),
3099                ..BatcherHooks::NONE
3100            })
3101            .buffered()
3102            .unwrap();
3103
3104        let new_config = rx
3105            .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3106            .expect("no config change message received within timeout");
3107        assert_eq!(
3108            new_config,
3109            ChunkBatcherConfig::from_env().unwrap(),
3110            "Buffered sink should uses the config from the environment"
3111        );
3112
3113        // Change sink to our custom sink. Will it take over the setting?
3114        let injected_config = ChunkBatcherConfig {
3115            flush_tick: std::time::Duration::from_secs(123),
3116            flush_num_bytes: 123,
3117            flush_num_rows: 123,
3118            ..new_config
3119        };
3120        rec.set_sink(Box::new(BatcherConfigTestSink {
3121            config: injected_config.clone(),
3122        }));
3123        let new_config = rx
3124            .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3125            .expect("no config change message received within timeout");
3126
3127        assert_eq!(new_config, injected_config);
3128
3129        // Set flush num bytes through env var and set the sink again.
3130        // check that the env var is respected.
3131        let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_NUM_BYTES", "456");
3132        rec.set_sink(Box::new(BatcherConfigTestSink {
3133            config: injected_config.clone(),
3134        }));
3135        let new_config = rx
3136            .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3137            .expect("no config change message received within timeout");
3138        assert_eq!(
3139            new_config,
3140            ChunkBatcherConfig {
3141                flush_num_bytes: 456,
3142                ..injected_config
3143            },
3144        );
3145    }
3146
3147    #[test]
3148    fn test_explicit_batcher_config() {
3149        clear_environment();
3150
3151        // This environment variable should *not* override the explicit config.
3152        let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_TICK_SECS", "456");
3153        let explicit_config = ChunkBatcherConfig {
3154            flush_tick: std::time::Duration::from_secs(123),
3155            flush_num_bytes: 123,
3156            flush_num_rows: 123,
3157            ..ChunkBatcherConfig::DEFAULT
3158        };
3159
3160        let (tx, rx) = std::sync::mpsc::channel();
3161        let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3162            .batcher_config(explicit_config.clone())
3163            .batcher_hooks(BatcherHooks {
3164                on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3165                    tx.send(config.clone()).unwrap();
3166                })),
3167                ..BatcherHooks::NONE
3168            })
3169            .buffered()
3170            .unwrap();
3171
3172        let new_config = rx
3173            .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3174            .expect("no config change message received within timeout");
3175        assert_eq!(new_config, explicit_config);
3176
3177        // Changing the sink should have no effect since an explicit config is in place.
3178        rec.set_sink(Box::new(BatcherConfigTestSink {
3179            config: ChunkBatcherConfig::ALWAYS,
3180        }));
3181        // Don't want to stall the test for CONFIG_CHANGE_TIMEOUT here.
3182        let new_config_recv_result = rx.recv_timeout(std::time::Duration::from_millis(100));
3183        assert_eq!(
3184            new_config_recv_result,
3185            Err(std::sync::mpsc::RecvTimeoutError::Timeout)
3186        );
3187    }
3188}