Skip to main content

re_sdk/
recording_stream.rs

1use std::fmt;
2use std::io::IsTerminal as _;
3use std::sync::atomic::AtomicI64;
4use std::sync::{Arc, Weak};
5use std::time::Duration;
6
7use ahash::HashMap;
8use crossbeam::channel::{Receiver, RecvTimeoutError, Sender};
9use itertools::Either;
10use nohash_hasher::IntMap;
11use parking_lot::Mutex;
12use re_chunk::{
13    BatcherFlushError, BatcherHooks, Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError,
14    ChunkComponents, ChunkError, ChunkId, PendingRow, RowId, TimeColumn,
15};
16use re_log_types::{
17    ApplicationId, ArrowRecordBatchReleaseCallback, BlueprintActivationCommand, EntityPath, LogMsg,
18    RecordingId, StoreId, StoreInfo, StoreKind, StoreSource, TimeCell, TimeInt, TimePoint,
19    Timeline, TimelineName,
20};
21use re_quota_channel::send_crossbeam;
22use re_sdk_types::archetypes::RecordingInfo;
23use re_sdk_types::components::Timestamp;
24use re_sdk_types::{AsComponents, SerializationError, SerializedComponentColumn};
25
26use crate::binary_stream_sink::BinaryStreamStorage;
27use crate::sink::{LogSink, MemorySinkStorage, SinkFlushError};
28
29// ---
30
31/// Private environment variable meant for tests.
32///
33/// When set, all recording streams will write to disk at the path indicated by the env-var rather
34/// than doing what they were asked to do - `connect_grpc()`, `buffered()`, even `save()` will re-use the same sink.
35const ENV_FORCE_SAVE: &str = "_RERUN_TEST_FORCE_SAVE";
36
37/// Returns path for force sink if private environment variable `_RERUN_TEST_FORCE_SAVE` is set
38///
39/// Newly created [`RecordingStream`]s should use a [`crate::sink::FileSink`] pointing to this path.
40/// Furthermore, [`RecordingStream::set_sink`] calls after this should not swap out to a new sink but re-use the existing one.
41/// Note that creating a new [`crate::sink::FileSink`] to the same file path (even temporarily) can cause
42/// a race between file creation (and thus clearing) and pending file writes.
43pub fn forced_sink_path() -> Option<String> {
44    std::env::var(ENV_FORCE_SAVE).ok()
45}
46
47/// Errors that can occur when creating/manipulating a [`RecordingStream`].
48#[derive(thiserror::Error, Debug)]
49pub enum RecordingStreamError {
50    /// Error within the underlying file sink.
51    #[error("Failed to create the underlying file sink: {0}")]
52    FileSink(#[from] re_log_encoding::FileSinkError),
53
54    /// Error within the underlying chunk batcher.
55    #[error("Failed to convert data to a valid chunk: {0}")]
56    Chunk(#[from] ChunkError),
57
58    /// Error within the underlying chunk batcher.
59    #[error("Failed to spawn the underlying batcher: {0}")]
60    ChunkBatcher(#[from] ChunkBatcherError),
61
62    /// Error within the underlying serializer.
63    #[error("Failed to serialize component data: {0}")]
64    Serialization(#[from] SerializationError),
65
66    /// Error spawning one of the background threads.
67    #[error("Failed to spawn background thread '{name}': {err}")]
68    SpawnThread {
69        /// Name of the thread
70        name: String,
71
72        /// Inner error explaining why the thread failed to spawn.
73        err: std::io::Error,
74    },
75
76    /// Error spawning a Rerun Viewer process.
77    #[error(transparent)] // makes bubbling all the way up to main look nice
78    SpawnViewer(#[from] crate::SpawnError),
79
80    /// Failure to host a web viewer and/or Rerun server.
81    #[cfg(feature = "web_viewer")]
82    #[error(transparent)]
83    WebSink(#[from] crate::web_viewer::WebViewerSinkError),
84
85    /// An error occurred while attempting to use a [`re_importer::Importer`].
86    #[cfg(feature = "importers")]
87    #[error(transparent)]
88    ImporterError(#[from] re_importer::ImporterError),
89
90    /// Invalid gRPC server address.
91    #[error(transparent)]
92    UriError(#[from] re_uri::Error),
93
94    /// Invalid endpoint
95    #[error("not a `/proxy` endpoint")]
96    NotAProxyEndpoint,
97
98    /// Invalid bind IP.
99    #[error(transparent)]
100    InvalidAddress(#[from] std::net::AddrParseError),
101}
102
103/// Results that can occur when creating/manipulating a [`RecordingStream`].
104pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
105
106// ---
107
108/// Construct a [`RecordingStream`].
109///
110/// ``` no_run
111/// # use re_sdk::RecordingStreamBuilder;
112/// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
113/// # Ok::<(), Box<dyn std::error::Error>>(())
114/// ```
115///
116/// Automatically sends a [`Chunk`] with the default [`RecordingInfo`] to
117/// the sink, unless an explicit `recording_id` is set via [`RecordingStreamBuilder::recording_id`].
118#[derive(Debug)]
119pub struct RecordingStreamBuilder {
120    application_id: ApplicationId,
121    store_kind: StoreKind,
122    recording_id: Option<RecordingId>,
123    store_source: Option<StoreSource>,
124
125    default_enabled: bool,
126    enabled: Option<bool>,
127
128    batcher_hooks: BatcherHooks,
129    batcher_config: Option<ChunkBatcherConfig>,
130
131    // Optional user-defined recording properties.
132    should_send_properties: bool,
133    recording_info: RecordingInfo,
134
135    /// Optional blueprint with activation settings.
136    blueprint: Option<crate::blueprint::BlueprintOpts>,
137}
138
139impl RecordingStreamBuilder {
140    /// Create a new [`RecordingStreamBuilder`] with the given [`ApplicationId`].
141    ///
142    /// The [`ApplicationId`] is usually the name of your app.
143    ///
144    /// ```no_run
145    /// # use re_sdk::RecordingStreamBuilder;
146    /// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
147    /// # Ok::<(), Box<dyn std::error::Error>>(())
148    /// ```
149    //
150    // NOTE: track_caller so that we can see if we are being called from an official example.
151    #[track_caller]
152    pub fn new(application_id: impl Into<ApplicationId>) -> Self {
153        let application_id = application_id.into();
154
155        Self {
156            application_id,
157            store_kind: StoreKind::Recording,
158            recording_id: None,
159            store_source: None,
160
161            default_enabled: true,
162            enabled: None,
163
164            batcher_config: None,
165            batcher_hooks: BatcherHooks::NONE,
166
167            should_send_properties: true,
168            recording_info: RecordingInfo::new()
169                .with_start_time(re_sdk_types::components::Timestamp::now()),
170
171            blueprint: None,
172        }
173    }
174
175    /// Create a new [`RecordingStreamBuilder`] with the given [`StoreId`].
176    //
177    // NOTE: track_caller so that we can see if we are being called from an official example.
178    #[track_caller]
179    pub fn from_store_id(store_id: &StoreId) -> Self {
180        Self {
181            application_id: store_id.application_id().clone(),
182            store_kind: store_id.kind(),
183            recording_id: Some(store_id.recording_id().clone()),
184            store_source: None,
185
186            default_enabled: true,
187            enabled: None,
188
189            batcher_config: None,
190            batcher_hooks: BatcherHooks::NONE,
191
192            should_send_properties: true,
193            recording_info: RecordingInfo::new()
194                .with_start_time(re_sdk_types::components::Timestamp::now()),
195
196            blueprint: None,
197        }
198    }
199
200    /// Set whether or not Rerun is enabled by default.
201    ///
202    /// If the `RERUN` environment variable is set, it will override this.
203    ///
204    /// Set also: [`Self::enabled`].
205    #[inline]
206    pub fn default_enabled(mut self, default_enabled: bool) -> Self {
207        self.default_enabled = default_enabled;
208        self
209    }
210
211    /// Set whether or not Rerun is enabled.
212    ///
213    /// Setting this will ignore the `RERUN` environment variable.
214    ///
215    /// Set also: [`Self::default_enabled`].
216    #[inline]
217    pub fn enabled(mut self, enabled: bool) -> Self {
218        self.enabled = Some(enabled);
219        self
220    }
221
222    /// Set the `RecordingId` for this context.
223    ///
224    /// If you're logging from multiple processes and want all the messages to end up in the same
225    /// recording, you must make sure that they all set the same `RecordingId` using this function.
226    ///
227    /// Note that many stores can share the same [`ApplicationId`], but they all have
228    /// unique `RecordingId`s.
229    ///
230    /// The default is to use a random `RecordingId`.
231    ///
232    /// When explicitly setting a `RecordingId`, the initial chunk that contains the recording
233    /// properties will not be sent.
234    #[inline]
235    pub fn recording_id(mut self, recording_id: impl Into<RecordingId>) -> Self {
236        self.recording_id = Some(recording_id.into());
237        self.send_properties(false)
238    }
239
240    /// Sets an optional name for the recording.
241    #[inline]
242    pub fn recording_name(mut self, name: impl Into<String>) -> Self {
243        self.recording_info = self.recording_info.with_name(name.into());
244        self
245    }
246
247    /// Sets an optional name for the recording.
248    #[inline]
249    pub fn recording_started(mut self, started: impl Into<Timestamp>) -> Self {
250        self.recording_info = self.recording_info.with_start_time(started);
251        self
252    }
253
254    /// Whether the [`RecordingInfo`] chunk should be sent.
255    #[inline]
256    pub fn send_properties(mut self, should_send: bool) -> Self {
257        self.should_send_properties = should_send;
258        self
259    }
260
261    /// Set a blueprint and make it active immediately.
262    ///
263    /// Use this when you want your blueprint to be shown immediately.
264    ///
265    /// To send a blueprint to an existing recording, use [`RecordingStream::send_blueprint`] instead.
266    #[inline]
267    pub fn with_blueprint(mut self, blueprint: crate::blueprint::Blueprint) -> Self {
268        self.blueprint = Some(crate::blueprint::BlueprintOpts {
269            blueprint,
270            activation: crate::blueprint::BlueprintActivation {
271                make_active: true,
272                make_default: true,
273            },
274        });
275        self
276    }
277
278    /// Set a default blueprint for this application.
279    ///
280    /// If the application already has an active blueprint, the new blueprint won't become
281    /// active until the user resets the blueprint. If you want to activate the blueprint
282    /// immediately, use [`Self::with_blueprint`] instead.
283    ///
284    /// To send a blueprint to an existing recording, use [`RecordingStream::send_blueprint`] instead.
285    #[inline]
286    pub fn with_default_blueprint(mut self, blueprint: crate::blueprint::Blueprint) -> Self {
287        self.blueprint = Some(crate::blueprint::BlueprintOpts {
288            blueprint,
289            activation: crate::blueprint::BlueprintActivation {
290                make_active: false,
291                make_default: true,
292            },
293        });
294        self
295    }
296
297    /// Specifies the configuration of the internal data batching mechanism.
298    ///
299    /// If not set, the default configuration for the currently active sink will be used.
300    /// Any environment variables as specified on [`ChunkBatcherConfig`] will always override respective settings.
301    ///
302    /// See [`ChunkBatcher`] & [`ChunkBatcherConfig`] for more information.
303    #[inline]
304    pub fn batcher_config(mut self, config: ChunkBatcherConfig) -> Self {
305        self.batcher_config = Some(config);
306        self
307    }
308
309    /// Specifies callbacks for the batcher thread.
310    ///
311    /// See [`ChunkBatcher`] & [`BatcherHooks`] for more information.
312    #[inline]
313    pub fn batcher_hooks(mut self, hooks: BatcherHooks) -> Self {
314        self.batcher_hooks = hooks;
315        self
316    }
317
318    #[doc(hidden)]
319    #[inline]
320    pub fn store_source(mut self, store_source: StoreSource) -> Self {
321        self.store_source = Some(store_source);
322        self
323    }
324
325    #[doc(hidden)]
326    #[inline]
327    pub fn blueprint(mut self) -> Self {
328        self.store_kind = StoreKind::Blueprint;
329        self
330    }
331
332    /// Creates a new [`RecordingStream`] that starts in a buffering state (RAM).
333    ///
334    /// ## Example
335    ///
336    /// ```
337    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").buffered()?;
338    /// # Ok::<(), Box<dyn std::error::Error>>(())
339    /// ```
340    pub fn buffered(self) -> RecordingStreamResult<RecordingStream> {
341        self.create_recording_stream("buffered", || {
342            Ok(Box::new(crate::log_sink::BufferedSink::new()))
343        })
344    }
345
346    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
347    /// [`crate::log_sink::MemorySink`].
348    ///
349    /// ## Example
350    ///
351    /// ```
352    /// # fn log_data(_: &re_sdk::RecordingStream) { }
353    ///
354    /// let (rec, storage) = re_sdk::RecordingStreamBuilder::new("rerun_example_app").memory()?;
355    ///
356    /// log_data(&rec);
357    ///
358    /// let data = storage.take();
359    ///
360    /// # Ok::<(), Box<dyn std::error::Error>>(())
361    /// ```
362    pub fn memory(
363        self,
364    ) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> {
365        let rec = self.create_recording_stream("memory", || {
366            Ok(Box::new(crate::log_sink::BufferedSink::new()))
367        })?;
368
369        let sink = crate::log_sink::MemorySink::new(rec.clone());
370        let storage = sink.buffer();
371        // Using set_sink here is necessary because the MemorySink needs to know
372        // it's own RecordingStream, which means we can't use `new` above.
373        // This has the downside of a bit of creation overhead and an extra StoreInfo
374        // message being sent to the sink.
375        // TODO(jleibs): Figure out a cleaner way to handle this.
376        rec.set_sink(Box::new(sink));
377        Ok((rec, storage))
378    }
379
380    /// Creates a new [`RecordingStream`] pre-configured to stream data to multiple sinks.
381    ///
382    /// Currently only supports [`GrpcSink`][grpc_sink] and [`FileSink`][file_sink].
383    ///
384    /// If the batcher configuration has not been set explicitly or by environment variables,
385    /// this will change the batcher configuration to a conservative (less often flushing) mix of
386    /// default configurations of the underlying sinks.
387    ///
388    /// [grpc_sink]: crate::sink::GrpcSink
389    /// [file_sink]: crate::sink::FileSink
390    pub fn set_sinks(
391        self,
392        sinks: impl crate::sink::IntoMultiSink,
393    ) -> RecordingStreamResult<RecordingStream> {
394        self.create_recording_stream("set_sinks", || Ok(Box::new(sinks.into_multi_sink())))
395    }
396
397    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
398    /// remote Rerun instance.
399    ///
400    /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
401    ///
402    /// ## Example
403    ///
404    /// ```no_run
405    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").connect_grpc()?;
406    /// # Ok::<(), Box<dyn std::error::Error>>(())
407    /// ```
408    pub fn connect_grpc(self) -> RecordingStreamResult<RecordingStream> {
409        self.connect_grpc_opts(format!(
410            "rerun+http://127.0.0.1:{}/proxy",
411            crate::DEFAULT_SERVER_PORT
412        ))
413    }
414
415    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
416    /// remote Rerun instance.
417    ///
418    /// ## Example
419    ///
420    /// ```no_run
421    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
422    ///     .connect_grpc_opts("rerun+http://127.0.0.1:9876/proxy")?;
423    /// # Ok::<(), Box<dyn std::error::Error>>(())
424    /// ```
425    pub fn connect_grpc_opts(
426        self,
427        url: impl Into<String>,
428    ) -> RecordingStreamResult<RecordingStream> {
429        self.create_recording_stream("connect_grpc", || {
430            let url: String = url.into();
431            let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
432                return Err(RecordingStreamError::NotAProxyEndpoint);
433            };
434            Ok(Box::new(crate::log_sink::GrpcSink::new(uri)))
435        })
436    }
437
438    #[cfg(feature = "server")]
439    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
440    /// locally hosted gRPC server.
441    ///
442    /// The server is hosted on the default IP and port, and may be connected to by any SDK or Viewer
443    /// at `rerun+http://127.0.0.1:9876/proxy` or by just running `rerun --connect`.
444    ///
445    /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
446    ///
447    /// The gRPC server will buffer in memory so that late connecting viewers will still get all the data.
448    /// You can control the amount of data buffered by the gRPC server using [`Self::serve_grpc_opts`].
449    /// Once the memory limit is reached, the earliest logged data
450    /// will be dropped. Static data is never dropped.
451    ///
452    /// NOTE: When the `RecordingStream` is dropped or disconnected, it will shut down the gRPC server.
453    pub fn serve_grpc(self) -> RecordingStreamResult<RecordingStream> {
454        use re_grpc_server::ServerOptions;
455
456        self.serve_grpc_opts(
457            "0.0.0.0",
458            crate::DEFAULT_SERVER_PORT,
459            ServerOptions {
460                memory_limit: re_memory::MemoryLimit::from_fraction_of_total(0.25),
461                ..Default::default()
462            },
463        )
464    }
465
466    #[cfg(feature = "server")]
467    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
468    /// locally hosted gRPC server.
469    ///
470    /// The server is hosted on the given `bind_ip` and `port`, may be connected to by any SDK or Viewer
471    /// at `rerun+http://{bind_ip}:{port}/proxy`.
472    ///
473    /// `0.0.0.0` is a good default for `bind_ip`.
474    ///
475    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
476    /// You can control the amount of data buffered by the gRPC server with the `server_options` argument.
477    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
478    ///
479    /// NOTE: When the `RecordingStream` is dropped or disconnected, it will shut down the gRPC server.
480    pub fn serve_grpc_opts(
481        self,
482        bind_ip: impl AsRef<str>,
483        port: u16,
484        server_options: re_grpc_server::ServerOptions,
485    ) -> RecordingStreamResult<RecordingStream> {
486        self.create_recording_stream("serve_grpc", || {
487            Ok(Box::new(crate::grpc_server::GrpcServerSink::new(
488                bind_ip.as_ref(),
489                port,
490                server_options,
491            )?))
492        })
493    }
494
495    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an
496    /// RRD file on disk.
497    ///
498    /// The Rerun Viewer is able to read continuously from the resulting rrd file while it is being written.
499    /// However, depending on your OS and configuration, changes may not be immediately visible due to file caching.
500    /// This is a common issue on Windows and (to a lesser extent) on `MacOS`.
501    ///
502    /// ## Example
503    ///
504    /// ```no_run
505    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
506    /// # Ok::<(), Box<dyn std::error::Error>>(())
507    /// ```
508    #[cfg(not(target_arch = "wasm32"))]
509    pub fn save(
510        self,
511        path: impl Into<std::path::PathBuf>,
512    ) -> RecordingStreamResult<RecordingStream> {
513        self.create_recording_stream("save", || Ok(Box::new(crate::sink::FileSink::new(path)?)))
514    }
515
516    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to stdout.
517    ///
518    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
519    /// default back to `buffered` mode, in order not to break the user's terminal.
520    ///
521    /// ## Example
522    ///
523    /// ```no_run
524    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").stdout()?;
525    /// # Ok::<(), Box<dyn std::error::Error>>(())
526    /// ```
527    #[cfg(not(target_arch = "wasm32"))]
528    pub fn stdout(self) -> RecordingStreamResult<RecordingStream> {
529        if std::io::stdout().is_terminal() {
530            re_log::debug!("Ignored call to stdout() because stdout is a terminal");
531            return self.buffered();
532        }
533
534        self.create_recording_stream("stdout", || Ok(Box::new(crate::sink::FileSink::stdout()?)))
535    }
536
537    /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
538    /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
539    ///
540    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
541    /// that viewer instead of starting a new one.
542    ///
543    /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
544    /// as well as the underlying connection.
545    ///
546    /// ## Example
547    ///
548    /// ```no_run
549    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").spawn()?;
550    /// # Ok::<(), Box<dyn std::error::Error>>(())
551    /// ```
552    pub fn spawn(self) -> RecordingStreamResult<RecordingStream> {
553        self.spawn_opts(&Default::default())
554    }
555
556    /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
557    /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
558    ///
559    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
560    /// that viewer instead of starting a new one.
561    ///
562    /// The behavior of the spawned Viewer can be configured via `opts`.
563    /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
564    ///
565    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
566    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
567    /// call to `flush` to block indefinitely if a connection cannot be established.
568    ///
569    /// ## Example
570    ///
571    /// ```no_run
572    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
573    ///     .spawn_opts(&re_sdk::SpawnOptions::default())?;
574    /// # Ok::<(), Box<dyn std::error::Error>>(())
575    /// ```
576    pub fn spawn_opts(self, opts: &crate::SpawnOptions) -> RecordingStreamResult<RecordingStream> {
577        if !self.is_enabled() {
578            re_log::debug!("Rerun disabled - call to spawn() ignored");
579            return Ok(RecordingStream::disabled());
580        }
581
582        // NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
583        // what, thus spawning a viewer is pointless (and probably not intended).
584        if forced_sink_path().is_some() {
585            let url = format!("rerun+http://{}/proxy", opts.connect_addr());
586            return self.connect_grpc_opts(url);
587        }
588
589        // Spawn viewer and connect normally.
590        // spawn() returns the actual port used, which may differ from opts.port when --new picks a free port.
591        let actual_port = crate::spawn(opts)?;
592        let addr = std::net::SocketAddr::new(
593            std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
594            actual_port,
595        );
596        self.connect_grpc_opts(format!("rerun+http://{addr}/proxy"))
597    }
598
599    /// Returns whether or not logging is enabled, a [`StoreInfo`], the associated batcher
600    /// configuration, and the blueprint.
601    ///
602    /// This can be used to then construct a [`RecordingStream`] manually using
603    /// [`RecordingStream::new`].
604    pub fn into_args(
605        self,
606    ) -> (
607        bool,
608        StoreInfo,
609        Option<RecordingInfo>,
610        Option<ChunkBatcherConfig>,
611        BatcherHooks,
612        Option<crate::blueprint::BlueprintOpts>,
613    ) {
614        let enabled = self.is_enabled();
615
616        let Self {
617            application_id,
618            store_kind,
619            recording_id,
620            store_source,
621            default_enabled: _,
622            enabled: _,
623            batcher_config,
624            batcher_hooks,
625            should_send_properties,
626            recording_info,
627            blueprint,
628        } = self;
629
630        let store_id = StoreId::new(
631            store_kind,
632            application_id,
633            recording_id.unwrap_or_else(RecordingId::random),
634        );
635        let store_source = store_source.unwrap_or_else(|| StoreSource::RustSdk {
636            rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
637            llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
638        });
639
640        let store_info = StoreInfo::new(store_id, store_source);
641
642        (
643            enabled,
644            store_info,
645            should_send_properties.then_some(recording_info),
646            batcher_config,
647            batcher_hooks,
648            blueprint,
649        )
650    }
651
652    fn create_recording_stream(
653        self,
654        function_name: &'static str,
655        sink_factory: impl FnOnce() -> RecordingStreamResult<Box<dyn LogSink>>,
656    ) -> RecordingStreamResult<RecordingStream> {
657        let (enabled, store_info, properties, batcher_config, batcher_hooks, blueprint_opts) =
658            self.into_args();
659        if enabled {
660            let stream = RecordingStream::new(
661                store_info,
662                properties,
663                batcher_config,
664                batcher_hooks,
665                sink_factory()?,
666            )?;
667            if let Some(blueprint_opts) = blueprint_opts {
668                blueprint_opts.send(&stream)?;
669            }
670            Ok(stream)
671        } else {
672            re_log::debug!("Rerun disabled - call to {function_name}() ignored");
673            Ok(RecordingStream::disabled())
674        }
675    }
676
677    /// Internal check for whether or not logging is enabled using explicit/default settings & env var.
678    fn is_enabled(&self) -> bool {
679        self.enabled
680            .unwrap_or_else(|| crate::decide_logging_enabled(self.default_enabled))
681    }
682}
683
684// ----------------------------------------------------------------------------
685
686/// A [`RecordingStream`] handles everything related to logging data into Rerun.
687///
688/// You can construct a new [`RecordingStream`] using [`RecordingStreamBuilder`] or
689/// [`RecordingStream::new`].
690///
691/// ## Sinks
692///
693/// Data is logged into Rerun via [`LogSink`]s.
694///
695/// The underlying [`LogSink`] of a [`RecordingStream`] can be changed at any point during its
696/// lifetime by calling [`RecordingStream::set_sink`] or one of the higher level helpers
697/// ([`RecordingStream::connect_grpc`], [`RecordingStream::memory`],
698/// [`RecordingStream::save`], [`RecordingStream::disconnect`]).
699///
700/// See [`RecordingStream::set_sink`] for more information.
701///
702/// ## Multithreading and ordering
703///
704/// [`RecordingStream`] can be cheaply cloned and used freely across any number of threads.
705///
706/// Internally, all operations are linearized into a pipeline:
707/// - All operations sent by a given thread will take effect in the same exact order as that
708///   thread originally sent them in, from its point of view.
709/// - There isn't any well defined global order across multiple threads.
710///
711/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all previous data sent by the calling thread
712/// has been recorded and (if applicable) flushed to the underlying OS-managed file descriptor,
713/// but other threads may still have data in flight.
714///
715/// ## Shutdown
716///
717/// The [`RecordingStream`] can only be shutdown by dropping all instances of it, at which point
718/// it will automatically take care of flushing any pending data that might remain in the pipeline.
719///
720/// Shutting down cannot ever block.
721#[derive(Clone)]
722pub struct RecordingStream {
723    inner: Either<Arc<RecordingStreamInner>, Weak<RecordingStreamInner>>,
724}
725
726impl RecordingStream {
727    /// Passes a reference to the [`RecordingStreamInner`], if it exists.
728    ///
729    /// This works whether the underlying stream is strong or weak.
730    #[inline]
731    fn with<F: FnOnce(&RecordingStreamInner) -> R, R>(&self, f: F) -> Option<R> {
732        match &self.inner {
733            Either::Left(strong) => Some(f(strong)),
734            Either::Right(weak) => Some(f(&*weak.upgrade()?)),
735        }
736    }
737
738    /// Clones the [`RecordingStream`] without incrementing the refcount.
739    ///
740    /// Useful e.g. if you want to make sure that a detached thread won't prevent the [`RecordingStream`]
741    /// from flushing during shutdown.
742    //
743    // TODO(#5335): shutdown flushing behavior is too brittle.
744    #[inline]
745    pub fn clone_weak(&self) -> Self {
746        Self {
747            inner: match &self.inner {
748                Either::Left(strong) => Either::Right(Arc::downgrade(strong)),
749                Either::Right(weak) => Either::Right(Weak::clone(weak)),
750            },
751        }
752    }
753
754    /// Returns the current reference count of the [`RecordingStream`].
755    ///
756    /// Returns 0 if the stream was created by [`RecordingStream::disabled()`],
757    /// or if it is a [`clone_weak()`][Self::clone_weak] of a stream whose strong instances
758    /// have all been dropped.
759    pub fn ref_count(&self) -> usize {
760        match &self.inner {
761            Either::Left(strong) => Arc::strong_count(strong),
762            Either::Right(weak) => weak.strong_count(),
763        }
764    }
765}
766
767// TODO(#5335): shutdown flushing behavior is too brittle.
768impl Drop for RecordingStream {
769    #[inline]
770    fn drop(&mut self) {
771        // If this holds the last strong handle to the recording, make sure that all pending
772        // importer threads that were started from the SDK actually run to completion (they
773        // all hold a weak handle to this very recording!).
774        //
775        // NOTE: It's very important to do so from the `Drop` implementation of `RecordingStream`
776        // itself, because the importer threads -- by definition -- will have to send data into
777        // this very recording, therefore we must make sure that at least one strong handle still lives
778        // on until they are all finished.
779        if let Either::Left(strong) = &mut self.inner
780            && Arc::strong_count(strong) == 1
781        {
782            // Keep the recording alive until all importers are finished.
783            self.with(|inner| inner.wait_for_importers());
784        }
785    }
786}
787
788struct RecordingStreamInner {
789    store_info: StoreInfo,
790    recording_info: Option<RecordingInfo>,
791    tick: AtomicI64,
792
793    /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
794    /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
795    /// running.
796    cmds_tx: re_quota_channel::Sender<Command>,
797
798    batcher: ChunkBatcher,
799    batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
800
801    /// Mirror of the batcher's currently active configuration.
802    current_batcher_config: Mutex<ChunkBatcherConfig>,
803
804    /// It true, any new sink will update the batcher's configuration (as far as possible).
805    sink_dependent_batcher_config: bool,
806
807    /// Keeps track of the top-level threads that were spawned in order to execute the importer
808    /// machinery in the context of this `RecordingStream`.
809    ///
810    /// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
811    importer_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,
812
813    pid_at_creation: u32,
814}
815
816impl fmt::Debug for RecordingStreamInner {
817    #[inline]
818    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
819        f.debug_struct("RecordingStreamInner")
820            .field("store_id", &self.store_info.store_id)
821            .finish_non_exhaustive()
822    }
823}
824
825impl Drop for RecordingStreamInner {
826    fn drop(&mut self) {
827        if self.is_forked_child() {
828            re_log::error_once!(
829                "Fork detected while dropping RecordingStreamInner. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
830            );
831            return;
832        }
833
834        self.wait_for_importers();
835
836        // NOTE: The command channel is private, if we're here, nothing is currently capable of
837        // sending data down the pipeline.
838        let timeout = Duration::MAX;
839        if let Err(err) = self.batcher.flush_blocking(timeout) {
840            re_log::error!("Failed to flush batcher: {err}");
841        }
842        self.cmds_tx.send(Command::PopPendingChunks).ok();
843        self.cmds_tx.send(Command::Shutdown).ok();
844        if let Some(handle) = self.batcher_to_sink_handle.take() {
845            handle.join().ok();
846        }
847    }
848}
849
850fn resolve_batcher_config(
851    batcher_config: Option<ChunkBatcherConfig>,
852    sink: &dyn LogSink,
853) -> ChunkBatcherConfig {
854    if let Some(explicit_batcher_config) = batcher_config {
855        explicit_batcher_config
856    } else {
857        let default_config = sink.default_batcher_config();
858        default_config.apply_env().unwrap_or_else(|err| {
859            re_log::error!("Failed to parse ChunkBatcherConfig from env: {err}");
860            default_config
861        })
862    }
863}
864
865/// Warns once if `sink` defers finalization to shutdown (e.g. a file sink) and is paired with a
866/// flush-on-every-row batcher config such as [`ChunkBatcherConfig::ALWAYS_TEST_ONLY`].
867///
868/// These sinks have to keep per-chunk metadata in memory until the footer can be written at
869/// process exit. A flush-on-every-row config produces one chunk per row, so for long-running
870/// recordings this can drive memory usage through the roof.
871fn warn_if_problematic_file_sink_config(config: &ChunkBatcherConfig, sink: &dyn LogSink) {
872    if !sink.defers_finalization_to_shutdown() {
873        return;
874    }
875
876    if !config.always_flushes() {
877        return;
878    }
879
880    // Snippet-roundtrip tests intentionally pair this config with a file sink to exercise the
881    // per-row serialization path. The warning is correct in principle but noisy (and fatal under
882    // `RERUN_PANIC_ON_WARN`) for that controlled setup, so suppress it when the test harness
883    // signals strict-test mode.
884    if re_log::env_var_is_truthy("RERUN_STRICT") {
885        return;
886    }
887
888    re_log::warn_once!(
889        "ChunkBatcherConfig::ALWAYS_TEST_ONLY (or an equivalent flush-on-every-row config) is \
890         being used with a file sink. This produces one chunk per row, and the file's footer \
891         cannot be written until the SDK process exits — so per-chunk metadata accumulates in \
892         memory for the entire lifetime of the recording, which can blow up memory usage. \
893         Use the default `ChunkBatcherConfig` for production workloads, or \
894         `ChunkBatcherConfig::LOW_LATENCY` if you need fast flushing."
895    );
896}
897
898impl RecordingStreamInner {
899    fn new(
900        store_info: StoreInfo,
901        recording_info: Option<RecordingInfo>,
902        batcher_config: Option<ChunkBatcherConfig>,
903        batcher_hooks: BatcherHooks,
904        sink: Box<dyn LogSink>,
905    ) -> RecordingStreamResult<Self> {
906        let sink_dependent_batcher_config = batcher_config.is_none();
907        let batcher_config = resolve_batcher_config(batcher_config, &*sink);
908
909        warn_if_problematic_file_sink_config(&batcher_config, &*sink);
910
911        let on_release = batcher_hooks.on_release.clone();
912        let batcher = ChunkBatcher::new(batcher_config, batcher_hooks)?;
913
914        {
915            re_log::debug!(
916                store_id = ?store_info.store_id,
917                "Setting StoreInfo",
918            );
919            sink.send(
920                re_log_types::SetStoreInfo {
921                    row_id: *RowId::new(),
922                    info: store_info.clone(),
923                }
924                .into(),
925            );
926        }
927
928        let (cmds_tx, cmds_rx) = re_quota_channel::channel(
929            "RecordingStream::cmds",
930            batcher_config.max_bytes_in_flight / 2,
931        );
932
933        let batcher_to_sink_handle = {
934            const NAME: &str = "RecordingStream::batcher_to_sink";
935            std::thread::Builder::new()
936                .name(NAME.into())
937                .spawn({
938                    let info = store_info.clone();
939                    let batcher = batcher.clone();
940                    move || forwarding_thread(info, sink, cmds_rx, batcher.chunks(), on_release)
941                })
942                .map_err(|err| RecordingStreamError::SpawnThread {
943                    name: NAME.into(),
944                    err,
945                })?
946        };
947
948        if let Some(recording_info) = recording_info.as_ref() {
949            // We pre-populate the batcher with a chunk the contains the `RecordingInfo`
950            // so that these get automatically sent to the sink.
951
952            re_log::trace!(recording_info = ?recording_info, "Adding RecordingInfo to batcher");
953
954            let chunk = Chunk::builder(EntityPath::properties())
955                .with_archetype(RowId::new(), TimePoint::default(), recording_info)
956                .build()?;
957
958            batcher.push_chunk(chunk);
959        }
960
961        Ok(Self {
962            store_info,
963            recording_info,
964            tick: AtomicI64::new(0),
965            cmds_tx,
966            batcher,
967            batcher_to_sink_handle: Some(batcher_to_sink_handle),
968            current_batcher_config: Mutex::new(batcher_config),
969            sink_dependent_batcher_config,
970            importer_handles: Mutex::new(Vec::new()),
971            pid_at_creation: std::process::id(),
972        })
973    }
974
975    #[inline]
976    pub fn is_forked_child(&self) -> bool {
977        self.pid_at_creation != std::process::id()
978    }
979
980    /// Make sure all pending top-level importer threads that were started from the SDK run to completion.
981    //
982    // TODO(cmc): At some point we might want to make it configurable, though I cannot really
983    // think of a use case where you'd want to drop those threads immediately upon
984    // disconnection.
985    fn wait_for_importers(&self) {
986        let importer_handles = std::mem::take(&mut *self.importer_handles.lock());
987        for handle in importer_handles {
988            handle.join().ok();
989        }
990    }
991}
992
993type InspectSinkFn = Box<dyn FnOnce(&dyn LogSink) + Send + 'static>;
994
995type FlushResult = Result<(), SinkFlushError>;
996
997enum Command {
998    RecordMsg(LogMsg),
999    SwapSink {
1000        new_sink: Box<dyn LogSink>,
1001        timeout: Duration,
1002    },
1003    // TODO(#10444): This should go away with more explicit sinks.
1004    InspectSink(InspectSinkFn),
1005    Flush {
1006        on_done: Sender<FlushResult>,
1007        timeout: Duration,
1008    },
1009
1010    /// Drop any sinks whose on-disk format only finalizes at shutdown (i.e. file-like sinks with
1011    /// footers), while leaving streaming sinks (e.g. gRPC) untouched. Used by Python's
1012    /// `RecordingStream.__exit__` to ensure file-backed recordings are consumable as soon as
1013    /// the `with`-block exits, without waiting for GC.
1014    FinalizeDeferredSinks {
1015        on_done: Sender<()>,
1016        timeout: Duration,
1017    },
1018    PopPendingChunks,
1019    Shutdown,
1020}
1021
1022impl std::fmt::Debug for Command {
1023    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1024        match self {
1025            Self::RecordMsg(msg) => f.debug_tuple("RecordMsg").field(msg).finish(),
1026            Self::SwapSink { .. } => f.debug_struct("SwapSink").finish_non_exhaustive(),
1027            Self::InspectSink(_) => f.debug_tuple("InspectSink").finish_non_exhaustive(),
1028            Self::Flush { .. } => f.debug_struct("Flush").finish_non_exhaustive(),
1029            Self::FinalizeDeferredSinks { .. } => f
1030                .debug_struct("FinalizeDeferredSinks")
1031                .finish_non_exhaustive(),
1032            Self::PopPendingChunks => write!(f, "PopPendingChunks"),
1033            Self::Shutdown => write!(f, "Shutdown"),
1034        }
1035    }
1036}
1037
1038impl re_byte_size::SizeBytes for Command {
1039    fn heap_size_bytes(&self) -> u64 {
1040        match self {
1041            Self::RecordMsg(msg) => msg.heap_size_bytes(),
1042            Self::SwapSink { .. }
1043            | Self::InspectSink(_)
1044            | Self::Flush { .. }
1045            | Self::FinalizeDeferredSinks { .. }
1046            | Self::PopPendingChunks
1047            | Self::Shutdown => 0,
1048        }
1049    }
1050}
1051
1052impl Command {
1053    fn flush(timeout: Duration) -> (Self, Receiver<FlushResult>) {
1054        let (on_done, rx) = crossbeam::channel::bounded(1); // oneshot
1055        (Self::Flush { on_done, timeout }, rx)
1056    }
1057
1058    fn finalize_deferred_sinks(timeout: Duration) -> (Self, Receiver<()>) {
1059        let (on_done, rx) = crossbeam::channel::bounded(1); // oneshot
1060        (Self::FinalizeDeferredSinks { on_done, timeout }, rx)
1061    }
1062}
1063
1064impl RecordingStream {
1065    /// Creates a new [`RecordingStream`] with a given [`StoreInfo`] and [`LogSink`].
1066    ///
1067    /// You can create a [`StoreInfo`] with [`crate::new_store_info`];
1068    ///
1069    /// The [`StoreInfo`] is immediately sent to the sink in the form of a
1070    /// [`re_log_types::SetStoreInfo`].
1071    ///
1072    /// You can find sinks in [`crate::sink`].
1073    ///
1074    /// If no batcher configuration is provided, the default batcher configuration for the sink will be used.
1075    /// Any environment variables as specified in [`ChunkBatcherConfig`] will always override respective settings.
1076    ///
1077    /// See also: [`RecordingStreamBuilder`].
1078    #[must_use = "Recording will get closed automatically once all instances of this object have been dropped"]
1079    pub fn new(
1080        store_info: StoreInfo,
1081        recording_info: Option<RecordingInfo>,
1082        batcher_config: Option<ChunkBatcherConfig>,
1083        batcher_hooks: BatcherHooks,
1084        sink: Box<dyn LogSink>,
1085    ) -> RecordingStreamResult<Self> {
1086        let sink = store_info
1087            .store_id
1088            .is_recording()
1089            .then(forced_sink_path)
1090            .flatten()
1091            .map_or(sink, |path| {
1092                re_log::info!("Forcing FileSink because of env-var {ENV_FORCE_SAVE}={path:?}");
1093                Box::new(
1094                    crate::sink::FileSink::new(path)
1095                        .expect("Failed to create FileSink for forced test path"),
1096                ) as Box<dyn LogSink>
1097            });
1098
1099        let stream = RecordingStreamInner::new(
1100            store_info,
1101            recording_info,
1102            batcher_config,
1103            batcher_hooks,
1104            sink,
1105        )
1106        .map(|inner| Self {
1107            inner: Either::Left(Arc::new(inner)),
1108        })?;
1109
1110        Ok(stream)
1111    }
1112
1113    /// Creates a new no-op [`RecordingStream`] that drops all logging messages, doesn't allocate
1114    /// any memory and doesn't spawn any threads.
1115    ///
1116    /// [`Self::is_enabled`] will return `false`.
1117    pub const fn disabled() -> Self {
1118        Self {
1119            inner: Either::Right(Weak::new()),
1120        }
1121    }
1122}
1123
1124impl RecordingStream {
1125    /// Log data to Rerun.
1126    ///
1127    /// This is the main entry point for logging data to rerun. It can be used to log anything
1128    /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1129    /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1130    ///
1131    /// The data will be timestamped automatically based on the [`RecordingStream`]'s internal clock.
1132    /// See [`RecordingStream::set_time_sequence`] etc for more information.
1133    ///
1134    /// The entity path can either be a string
1135    /// (with special characters escaped, split on unescaped slashes)
1136    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1137    /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/entity-path> for more on entity paths.
1138    ///
1139    /// See also: [`Self::log_static`] for logging static data.
1140    ///
1141    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1142    /// transport.
1143    /// See [SDK Micro Batching] for more information.
1144    ///
1145    /// # Example:
1146    /// ```ignore
1147    /// # use rerun;
1148    /// # let (rec, storage) = rerun::RecordingStreamBuilder::new("rerun_example_points3d_simple").memory()?;
1149    /// rec.log(
1150    ///     "my/points",
1151    ///     &rerun::Points3D::new([(0.0, 0.0, 0.0), (1.0, 1.0, 1.0)]),
1152    /// )?;
1153    /// # Ok::<(), Box<dyn std::error::Error>>(())
1154    /// ```
1155    ///
1156    /// # Thread Safety
1157    ///
1158    /// While [`RecordingStream`] is `Send + Sync` and safe to use from multiple threads,
1159    /// **avoid calling `log` while holding a [`std::sync::Mutex`]**. The rerun SDK uses
1160    /// [rayon](https://docs.rs/rayon) internally for parallel processing, and rayon's
1161    /// work-stealing behavior can cause deadlocks when combined with held mutexes
1162    /// (see [rayon#592](https://github.com/rayon-rs/rayon/issues/592)).
1163    ///
1164    /// ```ignore
1165    /// // ❌ Don't do this - potential deadlock:
1166    /// let guard = mutex.lock().unwrap();
1167    /// stream.log("data", &rerun::Points3D::new(points))?;
1168    /// drop(guard);
1169    ///
1170    /// // ✅ Do this instead - extract data first:
1171    /// let points = {
1172    ///     let guard = mutex.lock().unwrap();
1173    ///     guard.points.clone()
1174    /// };
1175    /// stream.log("data", &rerun::Points3D::new(points))?;
1176    /// ```
1177    ///
1178    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1179    /// [component bundle]: [`AsComponents`]
1180    #[inline]
1181    pub fn log<AS: ?Sized + AsComponents>(
1182        &self,
1183        ent_path: impl Into<EntityPath>,
1184        as_components: &AS,
1185    ) -> RecordingStreamResult<()> {
1186        self.log_with_static(ent_path, false, as_components)
1187    }
1188
1189    /// Lower-level logging API to provide data spanning multiple timepoints.
1190    ///
1191    /// Unlike the regular `log` API, which is row-oriented, this API lets you submit the data
1192    /// in a columnar form. The lengths of all of the [`TimeColumn`] and the component columns
1193    /// must match. All data that occurs at the same index across the different index/time and components
1194    /// arrays will act as a single logical row.
1195    ///
1196    /// Note that this API ignores any stateful index/time set on the log stream via the
1197    /// [`Self::set_time`]/[`Self::set_timepoint`]/etc. APIs.
1198    /// Furthermore, this will _not_ inject the default timelines `log_tick` and `log_time` timeline columns.
1199    pub fn send_columns(
1200        &self,
1201        ent_path: impl Into<EntityPath>,
1202        indexes: impl IntoIterator<Item = TimeColumn>,
1203        columns: impl IntoIterator<Item = SerializedComponentColumn>,
1204    ) -> RecordingStreamResult<()> {
1205        let id = ChunkId::new();
1206
1207        let indexes = indexes
1208            .into_iter()
1209            .map(|col| (*col.timeline().name(), col))
1210            .collect();
1211
1212        let components: ChunkComponents = columns
1213            .into_iter()
1214            .map(|column| (column.descriptor, column.list_array))
1215            .collect();
1216
1217        let chunk = Chunk::from_auto_row_ids(id, ent_path.into(), indexes, components)?;
1218
1219        self.send_chunk(chunk);
1220
1221        Ok(())
1222    }
1223
1224    /// Log data to Rerun.
1225    ///
1226    /// It can be used to log anything
1227    /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1228    /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1229    ///
1230    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1231    /// any temporal data of the same type.
1232    /// All timestamp data associated with this message will be dropped right before sending it to Rerun.
1233    ///
1234    /// This is most often used for [`rerun::ViewCoordinates`](https://docs.rs/rerun/latest/rerun/archetypes/struct.ViewCoordinates.html) and
1235    /// [`rerun::AnnotationContext`](https://docs.rs/rerun/latest/rerun/archetypes/struct.AnnotationContext.html).
1236    ///
1237    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1238    /// transport.
1239    /// See [SDK Micro Batching] for more information.
1240    ///
1241    /// See also [`Self::log`].
1242    ///
1243    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1244    /// [component bundle]: [`AsComponents`]
1245    #[inline]
1246    pub fn log_static<AS: ?Sized + AsComponents>(
1247        &self,
1248        ent_path: impl Into<EntityPath>,
1249        as_components: &AS,
1250    ) -> RecordingStreamResult<()> {
1251        self.log_with_static(ent_path, true, as_components)
1252    }
1253
1254    /// Logs the contents of a [component bundle] into Rerun.
1255    ///
1256    /// If `static_` is set to `true`, all timestamp data associated with this message will be
1257    /// dropped right before sending it to Rerun.
1258    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1259    /// any temporal data of the same type.
1260    ///
1261    /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1262    /// internal clock.
1263    /// See `RecordingStream::set_time_*` family of methods for more information.
1264    ///
1265    /// The entity path can either be a string
1266    /// (with special characters escaped, split on unescaped slashes)
1267    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1268    /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/entity-path> for more on entity paths.
1269    ///
1270    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1271    /// transport.
1272    /// See [SDK Micro Batching] for more information.
1273    ///
1274    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1275    /// [component bundle]: [`AsComponents`]
1276    #[inline]
1277    pub fn log_with_static<AS: ?Sized + AsComponents>(
1278        &self,
1279        ent_path: impl Into<EntityPath>,
1280        static_: bool,
1281        as_components: &AS,
1282    ) -> RecordingStreamResult<()> {
1283        let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1284        self.log_serialized_batches_impl(
1285            row_id,
1286            ent_path,
1287            static_,
1288            as_components.as_serialized_batches(),
1289        )
1290    }
1291
1292    /// Logs a set of [`SerializedComponentBatch`]es into Rerun.
1293    ///
1294    /// If `static_` is set to `true`, all timestamp data associated with this message will be
1295    /// dropped right before sending it to Rerun.
1296    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1297    /// any temporal data of the same type.
1298    ///
1299    /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1300    /// internal clock.
1301    /// See `RecordingStream::set_time_*` family of methods for more information.
1302    ///
1303    /// The number of instances will be determined by the longest batch in the bundle.
1304    ///
1305    /// The entity path can either be a string
1306    /// (with special characters escaped, split on unescaped slashes)
1307    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1308    /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/entity-path> for more on entity paths.
1309    ///
1310    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1311    /// transport.
1312    /// See [SDK Micro Batching] for more information.
1313    ///
1314    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1315    ///
1316    /// [`SerializedComponentBatch`]: [re_types_core::SerializedComponentBatch]
1317    pub fn log_serialized_batches(
1318        &self,
1319        ent_path: impl Into<EntityPath>,
1320        static_: bool,
1321        comp_batches: impl IntoIterator<Item = re_sdk_types::SerializedComponentBatch>,
1322    ) -> RecordingStreamResult<()> {
1323        let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1324        self.log_serialized_batches_impl(row_id, ent_path, static_, comp_batches)
1325    }
1326
1327    /// Sends a property to the recording.
1328    #[inline]
1329    pub fn send_property<AS: ?Sized + AsComponents>(
1330        &self,
1331        name: impl Into<String>,
1332        values: &AS,
1333    ) -> RecordingStreamResult<()> {
1334        let sub_path = EntityPath::from(name.into());
1335        self.log_static(EntityPath::properties().join(&sub_path), values)
1336    }
1337
1338    /// Sends the name of the recording.
1339    #[inline]
1340    pub fn send_recording_name(&self, name: impl Into<String>) -> RecordingStreamResult<()> {
1341        let update = RecordingInfo::update_fields().with_name(name.into());
1342        self.log_static(EntityPath::properties(), &update)
1343    }
1344
1345    /// Sends the start time of the recording.
1346    #[inline]
1347    pub fn send_recording_start_time(
1348        &self,
1349        timestamp: impl Into<Timestamp>,
1350    ) -> RecordingStreamResult<()> {
1351        let update = RecordingInfo::update_fields().with_start_time(timestamp.into());
1352        self.log_static(EntityPath::properties(), &update)
1353    }
1354
1355    // NOTE: For bw and fw compatibility reasons, we need our logging APIs to be fallible, even
1356    // though they really aren't at the moment.
1357    #[expect(clippy::unnecessary_wraps)]
1358    fn log_serialized_batches_impl(
1359        &self,
1360        row_id: RowId,
1361        entity_path: impl Into<EntityPath>,
1362        static_: bool,
1363        comp_batches: impl IntoIterator<Item = re_sdk_types::SerializedComponentBatch>,
1364    ) -> RecordingStreamResult<()> {
1365        if !self.is_enabled() {
1366            return Ok(()); // silently drop the message
1367        }
1368
1369        let entity_path = entity_path.into();
1370
1371        let components: IntMap<_, _> = comp_batches
1372            .into_iter()
1373            .map(|comp_batch| (comp_batch.descriptor.component, comp_batch))
1374            .collect();
1375
1376        // NOTE: The timepoint is irrelevant, the `RecordingStream` will overwrite it using its
1377        // internal clock.
1378        let timepoint = TimePoint::default();
1379
1380        if !components.is_empty() {
1381            let row = PendingRow {
1382                row_id,
1383                timepoint,
1384                components,
1385            };
1386            self.record_row(entity_path, row, !static_);
1387        }
1388
1389        Ok(())
1390    }
1391
1392    /// Logs the file at the given `path` using all [`re_importer::Importer`]s available.
1393    ///
1394    /// A single `path` might be handled by more than one importer.
1395    ///
1396    /// This method blocks until either at least one [`re_importer::Importer`] starts
1397    /// streaming data in or all of them fail.
1398    ///
1399    /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/importers/overview> for more information.
1400    #[cfg(feature = "importers")]
1401    pub fn log_file_from_path(
1402        &self,
1403        filepath: impl AsRef<std::path::Path>,
1404        entity_path_prefix: Option<EntityPath>,
1405        static_: bool,
1406    ) -> RecordingStreamResult<()> {
1407        self.log_file(filepath, None, entity_path_prefix, static_, true)
1408    }
1409
1410    /// Logs the given `contents` using all [`re_importer::Importer`]s available.
1411    ///
1412    /// A single `path` might be handled by more than one importer.
1413    ///
1414    /// This method blocks until either at least one [`re_importer::Importer`] starts
1415    /// streaming data in or all of them fail.
1416    ///
1417    /// See <https://www.rerun.io/docs/concepts/logging-and-ingestion/importers/overview> for more information.
1418    #[cfg(feature = "importers")]
1419    pub fn log_file_from_contents(
1420        &self,
1421        filepath: impl AsRef<std::path::Path>,
1422        contents: std::borrow::Cow<'_, [u8]>,
1423        entity_path_prefix: Option<EntityPath>,
1424        static_: bool,
1425    ) -> RecordingStreamResult<()> {
1426        self.log_file(filepath, Some(contents), entity_path_prefix, static_, true)
1427    }
1428
1429    /// If `prefer_current_recording` is set (which is always the case for now), the importer settings
1430    /// will be configured as if the current SDK recording is the currently opened recording.
1431    /// Most importers prefer logging to the currently opened recording if one is set.
1432    #[cfg(feature = "importers")]
1433    #[expect(clippy::fn_params_excessive_bools)] // private function 🤷‍♂️
1434    fn log_file(
1435        &self,
1436        filepath: impl AsRef<std::path::Path>,
1437        contents: Option<std::borrow::Cow<'_, [u8]>>,
1438        entity_path_prefix: Option<EntityPath>,
1439        static_: bool,
1440        prefer_current_recording: bool,
1441    ) -> RecordingStreamResult<()> {
1442        let Some(store_info) = self.store_info().clone() else {
1443            re_log::warn!(
1444                "Ignored call to log_file() because RecordingStream has not been properly initialized"
1445            );
1446            return Ok(());
1447        };
1448
1449        let filepath = filepath.as_ref();
1450        let has_contents = contents.is_some();
1451
1452        let (tx, rx) = re_log_channel::log_channel(re_log_channel::LogSource::File {
1453            path: filepath.into(),
1454            follow: false,
1455        });
1456
1457        let mut settings = crate::ImporterSettings {
1458            application_id: Some(store_info.application_id().clone()),
1459            recording_id: store_info.recording_id().clone(),
1460            opened_store_id: None,
1461            force_store_info: false,
1462            entity_path_prefix,
1463            follow: false,
1464            timepoint: (!static_).then(|| {
1465                self.with(|inner| {
1466                    // Get the current time on all timelines, for the current recording, on the current
1467                    // thread…
1468                    let mut now = self.now();
1469
1470                    // …and then also inject the current recording tick into it.
1471                    let tick = inner
1472                        .tick
1473                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1474                    now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1475
1476                    now
1477                })
1478                .unwrap_or_default()
1479            }),
1480            timestamp_offset_ns: None,
1481            timeline_type: re_log_types::TimeType::TimestampNs,
1482        };
1483
1484        if prefer_current_recording {
1485            settings.opened_store_id = Some(store_info.store_id);
1486        }
1487
1488        if let Some(contents) = contents {
1489            re_importer::import_from_file_contents(
1490                &settings,
1491                re_log_types::FileSource::Sdk,
1492                filepath,
1493                contents,
1494                &tx,
1495            )?;
1496        } else {
1497            re_importer::import_from_path(&settings, re_log_types::FileSource::Sdk, filepath, &tx)?;
1498        }
1499        drop(tx);
1500
1501        // We can safely ignore the error on `recv()` as we're in complete control of both ends of
1502        // the channel.
1503        let thread_name = if has_contents {
1504            format!("log_file_from_contents({filepath:?})")
1505        } else {
1506            format!("log_file_from_path({filepath:?})")
1507        };
1508        let handle = std::thread::Builder::new()
1509            .name(thread_name.clone())
1510            .spawn({
1511                let this = self.clone_weak();
1512                move || {
1513                    while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) {
1514                        match msg {
1515                            re_log_channel::DataSourceMessage::LogMsg(log_msg) => {
1516                                this.record_msg(log_msg);
1517                            }
1518                            unsupported => {
1519                                re_log::error_once!(
1520                                    "Ignoring unexpected {} in file",
1521                                    unsupported.variant_name()
1522                                );
1523                            }
1524                        }
1525                    }
1526                }
1527            })
1528            .map_err(|err| RecordingStreamError::SpawnThread {
1529                name: thread_name,
1530                err,
1531            })?;
1532
1533        self.with(|inner| inner.importer_handles.lock().push(handle));
1534
1535        Ok(())
1536    }
1537}
1538
1539#[expect(clippy::needless_pass_by_value)]
1540fn forwarding_thread(
1541    store_info: StoreInfo,
1542    mut sink: Box<dyn LogSink>,
1543    cmds_rx: re_quota_channel::Receiver<Command>,
1544    chunks: re_quota_channel::Receiver<Chunk>,
1545    on_release: Option<ArrowRecordBatchReleaseCallback>,
1546) {
1547    /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate
1548    /// shutdown.
1549    fn handle_cmd(store_info: &StoreInfo, cmd: Command, sink: &mut Box<dyn LogSink>) -> bool {
1550        match cmd {
1551            Command::RecordMsg(msg) => {
1552                sink.send(msg);
1553            }
1554            Command::SwapSink { new_sink, timeout } => {
1555                re_log::trace!("Swapping sink…");
1556
1557                let backlog = {
1558                    // Capture the backlog if it exists.
1559                    let backlog = sink.drain_backlog();
1560
1561                    // Flush the underlying sink if possible.
1562                    if let Err(err) = sink.flush_blocking(timeout) {
1563                        re_log::error!("Failed to flush previous sink: {err}");
1564                    }
1565
1566                    backlog
1567                };
1568
1569                // Send the recording info to the new sink. This is idempotent.
1570                {
1571                    re_log::debug!(
1572                        store_id = ?store_info.store_id,
1573                        "Setting StoreInfo",
1574                    );
1575                    new_sink.send(
1576                        re_log_types::SetStoreInfo {
1577                            row_id: *RowId::new(),
1578                            info: store_info.clone(),
1579                        }
1580                        .into(),
1581                    );
1582                    new_sink.send_all(backlog);
1583                }
1584
1585                *sink = new_sink;
1586            }
1587            Command::InspectSink(f) => {
1588                f(sink.as_ref());
1589            }
1590            Command::Flush { on_done, timeout } => {
1591                re_log::trace!("Flushing…");
1592
1593                let result = sink.flush_blocking(timeout);
1594
1595                // Send back the result:
1596                if let Err(crossbeam::channel::SendError(result)) = send_crossbeam(&on_done, result)
1597                    && let Err(err) = result
1598                {
1599                    // There was an error, and nobody received it:
1600                    re_log::error!("Failed to flush sink: {err}");
1601                }
1602            }
1603            Command::FinalizeDeferredSinks { on_done, timeout } => {
1604                if sink.defers_finalization_to_shutdown() && !sink.finalize_deferred_in_place() {
1605                    // Composite sinks handle finalization themselves; otherwise the entire
1606                    // sink defers finalization (e.g. a bare FileSink), so we have to swap it
1607                    // out for a BufferedSink — dropping the old sink runs its writer thread
1608                    // to completion and emits the footer.
1609                    let backlog = sink.drain_backlog();
1610                    if let Err(err) = sink.flush_blocking(timeout) {
1611                        re_log::error!("Failed to flush previous sink during finalize: {err}");
1612                    }
1613
1614                    let new_sink: Box<dyn LogSink> = Box::new(crate::log_sink::BufferedSink::new());
1615                    new_sink.send(
1616                        re_log_types::SetStoreInfo {
1617                            row_id: *RowId::new(),
1618                            info: store_info.clone(),
1619                        }
1620                        .into(),
1621                    );
1622                    new_sink.send_all(backlog);
1623                    *sink = new_sink;
1624                }
1625                send_crossbeam(&on_done, ()).ok();
1626            }
1627            Command::PopPendingChunks => {
1628                // Wake up and skip the current iteration so that we can drain all pending chunks
1629                // before handling the next command.
1630            }
1631            Command::Shutdown => return false,
1632        }
1633
1634        true
1635    }
1636
1637    loop {
1638        // NOTE: Always pop chunks first, this is what makes `Command::PopPendingChunks` possible,
1639        // which in turns makes `RecordingStream::flush_blocking` well defined.
1640        while let Ok(chunk) = chunks.try_recv() {
1641            let mut msg = match chunk.to_arrow_msg() {
1642                Ok(chunk) => chunk,
1643                Err(err) => {
1644                    re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1645                    continue;
1646                }
1647            };
1648            msg.on_release = on_release.clone();
1649            sink.send(LogMsg::ArrowMsg(store_info.store_id.clone(), msg));
1650        }
1651
1652        re_quota_channel::select! {
1653            recv(chunks) -> res => {
1654                let Ok(chunk) = res else {
1655                    // The batcher is gone, which can only happen if the `RecordingStream` itself
1656                    // has been dropped.
1657                    re_log::trace!("Shutting down forwarding_thread: batcher is gone");
1658                    break;
1659                };
1660
1661                let msg = match chunk.to_arrow_msg() {
1662                    Ok(chunk) => chunk,
1663                    Err(err) => {
1664                        re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1665                        continue;
1666                    }
1667                };
1668
1669                sink.send(LogMsg::ArrowMsg(store_info.store_id.clone(), msg));
1670            }
1671
1672            recv(cmds_rx) -> res => {
1673                let Ok(cmd) = res else {
1674                    // All command senders are gone, which can only happen if the
1675                    // `RecordingStream` itself has been dropped.
1676                    re_log::trace!("Shutting down forwarding_thread: all command senders are gone");
1677                    break;
1678                };
1679                if !handle_cmd(&store_info, cmd, &mut sink) {
1680                    break; // shutdown
1681                }
1682            }
1683        }
1684
1685        // NOTE: The receiving end of the command stream is owned solely by this thread.
1686        // Past this point, all command writes will return `ErrDisconnected`.
1687    }
1688}
1689
1690impl RecordingStream {
1691    /// Check if logging is enabled on this `RecordingStream`.
1692    ///
1693    /// If not, all recording calls will be ignored.
1694    #[inline]
1695    pub fn is_enabled(&self) -> bool {
1696        self.with(|_| true).unwrap_or(false)
1697    }
1698
1699    /// The [`StoreInfo`] associated with this `RecordingStream`.
1700    #[inline]
1701    pub fn store_info(&self) -> Option<StoreInfo> {
1702        self.with(|inner| inner.store_info.clone())
1703    }
1704
1705    /// Determine whether a fork has happened since creating this `RecordingStream`. In general, this means our
1706    /// batcher/sink threads are gone and all data logged since the fork has been dropped.
1707    ///
1708    /// It is essential that [`crate::cleanup_if_forked_child`] be called after forking the process. SDK-implementations
1709    /// should do this during their initialization phase.
1710    #[inline]
1711    pub fn is_forked_child(&self) -> bool {
1712        self.with(|inner| inner.is_forked_child()).unwrap_or(false)
1713    }
1714}
1715
1716impl RecordingStream {
1717    /// Records an arbitrary [`LogMsg`].
1718    #[inline]
1719    pub fn record_msg(&self, msg: LogMsg) {
1720        let f = move |inner: &RecordingStreamInner| {
1721            // NOTE: Internal channels can never be closed outside of the `Drop` impl, this send cannot
1722            // fail.
1723            inner.cmds_tx.send(Command::RecordMsg(msg)).ok();
1724            inner
1725                .tick
1726                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1727        };
1728
1729        if self.with(f).is_none() {
1730            re_log::warn_once!("Recording disabled - call to record_msg() ignored");
1731        }
1732    }
1733
1734    /// Records a single [`PendingRow`].
1735    ///
1736    /// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
1737    /// [`RecordingStream`]'s internal clock.
1738    ///
1739    /// Internally, incoming [`PendingRow`]s are automatically coalesced into larger [`Chunk`]s to
1740    /// optimize for transport.
1741    #[inline]
1742    pub fn record_row(&self, entity_path: EntityPath, mut row: PendingRow, inject_time: bool) {
1743        let f = move |inner: &RecordingStreamInner| {
1744            // NOTE: We're incrementing the current tick still.
1745            let tick = inner
1746                .tick
1747                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1748            if inject_time {
1749                // Get the current time on all timelines, for the current recording, on the current
1750                // thread…
1751                let mut now = self.now();
1752                // …and then also inject the current recording tick into it.
1753                now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1754
1755                // Inject all these times into the row, overriding conflicting times, if any.
1756                for (timeline, cell) in now {
1757                    row.timepoint.insert_cell(timeline, cell);
1758                }
1759            }
1760
1761            inner.batcher.push_row(entity_path, row);
1762        };
1763
1764        if self.with(f).is_none() {
1765            re_log::warn_once!("Recording disabled - call to record_row() ignored");
1766        }
1767    }
1768
1769    /// Logs a single [`Chunk`].
1770    ///
1771    /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1772    /// If you don't want to inject these, use [`Self::send_chunk`] instead.
1773    #[inline]
1774    pub fn log_chunk(&self, mut chunk: Chunk) {
1775        let f = move |inner: &RecordingStreamInner| {
1776            // TODO(cmc): Repeating these values is pretty wasteful. Would be nice to have a way of
1777            // indicating these are fixed across the whole chunk.
1778            // Inject the log time
1779            {
1780                let time_timeline = Timeline::log_time();
1781                let time =
1782                    TimeInt::new_temporal(re_log_types::Timestamp::now().nanos_since_epoch());
1783
1784                let repeated_time = std::iter::repeat_n(time.as_i64(), chunk.num_rows()).collect();
1785
1786                let time_column = TimeColumn::new(Some(true), time_timeline, repeated_time);
1787
1788                if let Err(err) = chunk.add_timeline(time_column) {
1789                    re_log::error!(
1790                        "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1791                        time_timeline.name(),
1792                        err
1793                    );
1794                    return;
1795                }
1796            }
1797            // Inject the log tick
1798            {
1799                let tick_timeline = Timeline::log_tick();
1800
1801                let tick = inner
1802                    .tick
1803                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1804
1805                let repeated_tick = std::iter::repeat_n(tick, chunk.num_rows()).collect();
1806
1807                let tick_chunk = TimeColumn::new(Some(true), tick_timeline, repeated_tick);
1808
1809                if let Err(err) = chunk.add_timeline(tick_chunk) {
1810                    re_log::error!(
1811                        "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1812                        tick_timeline.name(),
1813                        err
1814                    );
1815                    return;
1816                }
1817            }
1818
1819            inner.batcher.push_chunk(chunk);
1820        };
1821
1822        if self.with(f).is_none() {
1823            re_log::warn_once!("Recording disabled - call to log_chunk() ignored");
1824        }
1825    }
1826
1827    /// Logs multiple [`Chunk`]s.
1828    ///
1829    /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1830    /// for that use [`Self::log_chunks`].
1831    pub fn log_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1832        for chunk in chunks {
1833            self.log_chunk(chunk);
1834        }
1835    }
1836
1837    /// Records a single [`Chunk`].
1838    ///
1839    /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1840    /// If you don't want to inject these, use [`Self::send_chunks`] instead.
1841    #[inline]
1842    pub fn send_chunk(&self, chunk: Chunk) {
1843        let f = move |inner: &RecordingStreamInner| {
1844            inner.batcher.push_chunk(chunk);
1845        };
1846
1847        if self.with(f).is_none() {
1848            re_log::warn_once!("Recording disabled - call to send_chunk() ignored");
1849        }
1850    }
1851
1852    /// Records multiple [`Chunk`]s.
1853    ///
1854    /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1855    /// for that use [`Self::log_chunks`].
1856    pub fn send_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1857        for chunk in chunks {
1858            self.send_chunk(chunk);
1859        }
1860    }
1861
1862    /// Swaps the underlying sink for a new one.
1863    ///
1864    /// This guarantees that:
1865    /// 1. all pending rows and chunks are batched, collected and sent down the current sink,
1866    /// 2. the current sink is flushed if it has pending data in its buffers,
1867    /// 3. the current sink's backlog, if there's any, is forwarded to the new sink.
1868    ///
1869    /// When this function returns, the calling thread is guaranteed that all future record calls
1870    /// will end up in the new sink.
1871    ///
1872    /// If the batcher's configuration has not been set explicitly or by environment variables,
1873    /// this will change the batcher configuration to the sink's default configuration.
1874    ///
1875    /// ## Data loss
1876    ///
1877    /// If the current sink is in a broken state (e.g. a gRPC sink with a broken connection that
1878    /// cannot be repaired), all pending data in its buffers will be dropped.
1879    pub fn set_sink(&self, new_sink: Box<dyn LogSink>) {
1880        if self.is_forked_child() {
1881            re_log::error_once!(
1882                "Fork detected during set_sink. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1883            );
1884            return;
1885        }
1886
1887        let timeout = Duration::MAX; // The background thread should block forever if necessary
1888
1889        let f = move |inner: &RecordingStreamInner| {
1890            // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1891            // are safe.
1892
1893            // Flush the batcher down the chunk channel
1894            if let Err(err) = inner.batcher.flush_blocking(timeout) {
1895                re_log::warn!("Failed to flush batcher in `set_sink`: {err}");
1896            }
1897
1898            // Receive pending chunks from the batcher's channel
1899            inner.cmds_tx.send(Command::PopPendingChunks).ok();
1900
1901            // Update the batcher's configuration if it's sink-dependent.
1902            if inner.sink_dependent_batcher_config {
1903                let batcher_config = resolve_batcher_config(None, &*new_sink);
1904                inner.batcher.update_config(batcher_config);
1905                *inner.current_batcher_config.lock() = batcher_config;
1906            }
1907
1908            warn_if_problematic_file_sink_config(&inner.current_batcher_config.lock(), &*new_sink);
1909
1910            // Swap the sink, which will internally make sure to re-ingest the backlog if needed
1911            inner
1912                .cmds_tx
1913                .send(Command::SwapSink { new_sink, timeout })
1914                .ok();
1915
1916            // Before we give control back to the caller, we need to make sure that the swap has
1917            // taken place: we don't want the user to send data to the old sink!
1918            re_log::trace!("Waiting for sink swap to complete…");
1919            let (cmd, oneshot) = Command::flush(timeout);
1920            inner.cmds_tx.send(cmd).ok();
1921            oneshot.recv().ok();
1922            re_log::trace!("Sink swap completed.");
1923        };
1924
1925        if self.with(f).is_none() {
1926            re_log::warn_once!("Recording disabled - call to set_sink() ignored");
1927        }
1928    }
1929
1930    /// Initiates a flush of the pipeline and returns immediately.
1931    ///
1932    /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
1933    /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1934    ///
1935    /// This will never return [`SinkFlushError::Timeout`].
1936    pub fn flush_async(&self) -> Result<(), SinkFlushError> {
1937        re_tracing::profile_function!();
1938        match self.flush(None) {
1939            Err(SinkFlushError::Timeout) => Ok(()),
1940            result => result,
1941        }
1942    }
1943
1944    /// Flush the batching pipeline and waits for it to propagate.
1945    ///
1946    /// The function will block until either the flush has completed successfully (`Ok`),
1947    /// an error has occurred (`SinkFlushError::Failed`), or the timeout is reached (`SinkFlushError::Timeout`).
1948    ///
1949    /// Convenience for calling [`Self::flush_with_timeout`] with a timeout of [`Duration::MAX`]
1950    pub fn flush_blocking(&self) -> Result<(), SinkFlushError> {
1951        re_tracing::profile_function!();
1952        self.flush_with_timeout(Duration::MAX)
1953    }
1954
1955    /// Flush the batching pipeline and optionally waits for it to propagate.
1956    /// If you don't want a timeout you can pass in [`Duration::MAX`].
1957    ///
1958    /// The function will block until that timeout is reached,
1959    /// an error occurs, or the flush is complete.
1960    /// The function will only block while there is some hope of progress.
1961    /// For instance: if the underlying gRPC connection is disconnected (or never connected at all),
1962    /// then [`SinkFlushError::Failed`] is returned.
1963    ///
1964    /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1965    pub fn flush_with_timeout(&self, timeout: Duration) -> Result<(), SinkFlushError> {
1966        re_tracing::profile_function!();
1967        self.flush(Some(timeout))
1968    }
1969
1970    /// Flush the batching pipeline and optionally waits for it to propagate.
1971    ///
1972    /// If `timeout` is `None`, then this function will start the flush, but NOT wait for it to finish.
1973    ///
1974    /// If a `timeout` is given, then the function will block until that timeout is reached,
1975    /// an error occurs, or the flush is complete.
1976    ///
1977    /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1978    fn flush(&self, timeout: Option<Duration>) -> Result<(), SinkFlushError> {
1979        if self.is_forked_child() {
1980            return Err(SinkFlushError::failed(
1981                "Fork detected during flush. cleanup_if_forked() should always be called after forking. This is likely a bug in the Rerun SDK.",
1982            ));
1983        }
1984
1985        let f = move |inner: &RecordingStreamInner| -> Result<(), SinkFlushError> {
1986            // 0. Wait for all pending importer threads to complete
1987            //
1988            // This ensures that data from `log_file_from_path` and `log_file_from_contents`
1989            // is fully loaded before we flush the batcher and sink.
1990            inner.wait_for_importers();
1991
1992            // 1. Synchronously flush the batcher down the chunk channel
1993            //
1994            // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all chunks
1995            // are ready to be drained by the time this call returns.
1996            // It cannot block indefinitely and is fairly fast as it only requires compute (no I/O).
1997            inner
1998                .batcher
1999                .flush_blocking(Duration::MAX)
2000                .map_err(|err| match err {
2001                    BatcherFlushError::Closed => SinkFlushError::failed(err.to_string()),
2002                    BatcherFlushError::Timeout => SinkFlushError::Timeout,
2003                })?;
2004
2005            // 2. Drain all pending chunks from the batcher's channel _before_ any other future command
2006            inner
2007                .cmds_tx
2008                .send(Command::PopPendingChunks)
2009                .map_err(|_ignored| {
2010                    SinkFlushError::failed(
2011                        "Sink shut down prematurely. This is likely a bug in the Rerun SDK.",
2012                    )
2013                })?;
2014
2015            // 3. Asynchronously flush everything down the sink
2016            let (cmd, on_done) = Command::flush(Duration::MAX); // The background thread should block forever if necessary
2017            inner.cmds_tx.send(cmd).map_err(|_ignored| {
2018                SinkFlushError::failed(
2019                    "Sink shut down prematurely. This is likely a bug in the Rerun SDK.",
2020                )
2021            })?;
2022
2023            if let Some(timeout) = timeout {
2024                on_done.recv_timeout(timeout).map_err(|err| match err {
2025                    RecvTimeoutError::Timeout => SinkFlushError::Timeout,
2026                    RecvTimeoutError::Disconnected => SinkFlushError::failed(
2027                        "Flush never finished. This is likely a bug in the Rerun SDK.",
2028                    ),
2029                })??;
2030            }
2031
2032            Ok(())
2033        };
2034
2035        match self.with(f) {
2036            Some(Ok(())) => Ok(()),
2037            Some(Err(err)) => Err(err),
2038            None => {
2039                re_log::warn_once!("Recording disabled - call to flush ignored");
2040                Ok(())
2041            }
2042        }
2043    }
2044}
2045
2046impl RecordingStream {
2047    /// Stream data to multiple different sinks.
2048    ///
2049    /// This is semantically the same as calling [`RecordingStream::set_sink`], but the resulting
2050    /// [`RecordingStream`] will now stream data to multiple sinks at the same time.
2051    ///
2052    /// Currently only supports [`GrpcSink`][grpc_sink] and [`FileSink`][file_sink].
2053    ///
2054    /// If the batcher's configuration has not been set explicitly or by environment variables,
2055    /// This will take over a conservative default of the new sinks.
2056    /// (there's no guarantee on when exactly the new configuration will be active)
2057    ///
2058    /// [grpc_sink]: crate::sink::GrpcSink
2059    /// [file_sink]: crate::sink::FileSink
2060    pub fn set_sinks(&self, sinks: impl crate::log_sink::IntoMultiSink) {
2061        if forced_sink_path().is_some() {
2062            re_log::debug!("Ignored setting new MultiSink since {ENV_FORCE_SAVE} is set");
2063            return;
2064        }
2065
2066        let sink = sinks.into_multi_sink();
2067
2068        self.set_sink(Box::new(sink));
2069    }
2070
2071    /// Asynchronously calls a method that has read access to the currently active sink.
2072    ///
2073    /// Since a recording stream's sink is owned by a different thread there is no guarantee when
2074    /// the callback is going to be called.
2075    /// It's advised to return as quickly as possible from the callback since
2076    /// as long as the callback doesn't return, the sink will not receive any new data,
2077    ///
2078    /// # Experimental
2079    ///
2080    /// This is an experimental API and may change in future releases.
2081    // TODO(#10444): This should become a lot more straight forward with explicit sinks.
2082    pub fn inspect_sink(&self, f: impl FnOnce(&dyn LogSink) + Send + 'static) {
2083        self.with(|inner| inner.cmds_tx.send(Command::InspectSink(Box::new(f))).ok());
2084    }
2085
2086    /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
2087    /// the specified address.
2088    ///
2089    /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
2090    ///
2091    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2092    /// terms of data durability and ordering.
2093    /// See [`Self::set_sink`] for more information.
2094    pub fn connect_grpc(&self) -> RecordingStreamResult<()> {
2095        self.connect_grpc_opts(format!(
2096            "rerun+http://127.0.0.1:{}/proxy",
2097            crate::DEFAULT_SERVER_PORT
2098        ))
2099    }
2100
2101    /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
2102    /// the specified address.
2103    ///
2104    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2105    /// terms of data durability and ordering.
2106    /// See [`Self::set_sink`] for more information.
2107    ///
2108    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
2109    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
2110    /// call to `flush` to block indefinitely if a connection cannot be established.
2111    pub fn connect_grpc_opts(&self, url: impl Into<String>) -> RecordingStreamResult<()> {
2112        if forced_sink_path().is_some() {
2113            re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
2114            return Ok(());
2115        }
2116
2117        let url: String = url.into();
2118        let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
2119            return Err(RecordingStreamError::NotAProxyEndpoint);
2120        };
2121
2122        let sink = crate::log_sink::GrpcSink::new(uri);
2123
2124        self.set_sink(Box::new(sink));
2125        Ok(())
2126    }
2127
2128    #[cfg(feature = "server")]
2129    /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
2130    /// `rerun+http://127.0.0.1:9876/proxy`.
2131    ///
2132    /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
2133    ///
2134    /// You can connect a viewer to it with `rerun --connect`.
2135    ///
2136    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
2137    /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
2138    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
2139    pub fn serve_grpc(
2140        &self,
2141        server_options: re_grpc_server::ServerOptions,
2142    ) -> RecordingStreamResult<()> {
2143        self.serve_grpc_opts("0.0.0.0", crate::DEFAULT_SERVER_PORT, server_options)
2144    }
2145
2146    #[cfg(feature = "server")]
2147    /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
2148    /// `rerun+http://{bind_ip}:{port}/proxy`.
2149    ///
2150    /// `0.0.0.0` is a good default for `bind_ip`.
2151    ///
2152    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
2153    /// You can limit the amount of data buffered by the gRPC server with the `server_options` argument.
2154    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
2155    pub fn serve_grpc_opts(
2156        &self,
2157        bind_ip: impl AsRef<str>,
2158        port: u16,
2159        server_options: re_grpc_server::ServerOptions,
2160    ) -> RecordingStreamResult<()> {
2161        if forced_sink_path().is_some() {
2162            re_log::debug!("Ignored setting GrpcServerSink since {ENV_FORCE_SAVE} is set");
2163            return Ok(());
2164        }
2165
2166        let sink = crate::grpc_server::GrpcServerSink::new(bind_ip.as_ref(), port, server_options)?;
2167
2168        self.set_sink(Box::new(sink));
2169        Ok(())
2170    }
2171
2172    /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
2173    /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
2174    /// new process.
2175    ///
2176    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
2177    /// that viewer instead of starting a new one.
2178    ///
2179    /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
2180    /// as well as the underlying connection.
2181    ///
2182    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2183    /// terms of data durability and ordering.
2184    /// See [`Self::set_sink`] for more information.
2185    pub fn spawn(&self) -> RecordingStreamResult<()> {
2186        self.spawn_opts(&Default::default())
2187    }
2188
2189    /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
2190    /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
2191    /// new process.
2192    ///
2193    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
2194    /// that viewer instead of starting a new one.
2195    ///
2196    /// The behavior of the spawned Viewer can be configured via `opts`.
2197    /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
2198    ///
2199    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2200    /// terms of data durability and ordering.
2201    /// See [`Self::set_sink`] for more information.
2202    ///
2203    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
2204    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
2205    /// call to `flush` to block indefinitely if a connection cannot be established.
2206    pub fn spawn_opts(&self, opts: &crate::SpawnOptions) -> RecordingStreamResult<()> {
2207        if !self.is_enabled() {
2208            re_log::debug!("Rerun disabled - call to spawn() ignored");
2209            return Ok(());
2210        }
2211        if forced_sink_path().is_some() {
2212            re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
2213            return Ok(());
2214        }
2215
2216        let actual_port = crate::spawn(opts)?;
2217        let addr = std::net::SocketAddr::new(
2218            std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2219            actual_port,
2220        );
2221        self.connect_grpc_opts(format!("rerun+http://{addr}/proxy"))?;
2222
2223        Ok(())
2224    }
2225
2226    /// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
2227    /// [`MemorySinkStorage`].
2228    ///
2229    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2230    /// terms of data durability and ordering.
2231    /// See [`Self::set_sink`] for more information.
2232    pub fn memory(&self) -> MemorySinkStorage {
2233        let sink = crate::sink::MemorySink::new(self.clone());
2234        let storage = sink.buffer();
2235        self.set_sink(Box::new(sink));
2236        storage
2237    }
2238
2239    /// Swaps the underlying sink for a [`crate::sink::BinaryStreamSink`] sink and returns the associated
2240    /// [`BinaryStreamStorage`].
2241    ///
2242    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2243    /// terms of data durability and ordering.
2244    /// See [`Self::set_sink`] for more information.
2245    pub fn binary_stream(&self) -> BinaryStreamStorage {
2246        let (sink, storage) = crate::sink::BinaryStreamSink::new(self.clone());
2247        self.set_sink(Box::new(sink));
2248        storage
2249    }
2250
2251    /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2252    ///
2253    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2254    /// terms of data durability and ordering.
2255    /// See [`Self::set_sink`] for more information.
2256    pub fn save(
2257        &self,
2258        path: impl Into<std::path::PathBuf>,
2259    ) -> Result<(), crate::sink::FileSinkError> {
2260        self.save_opts(path)
2261    }
2262
2263    /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2264    ///
2265    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2266    /// terms of data durability and ordering.
2267    /// See [`Self::set_sink`] for more information.
2268    ///
2269    /// If a blueprint was provided, it will be stored first in the file.
2270    /// Blueprints are currently an experimental part of the Rust SDK.
2271    pub fn save_opts(
2272        &self,
2273        path: impl Into<std::path::PathBuf>,
2274    ) -> Result<(), crate::sink::FileSinkError> {
2275        if forced_sink_path().is_some() {
2276            re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2277            return Ok(());
2278        }
2279
2280        let sink = crate::sink::FileSink::new(path)?;
2281
2282        self.set_sink(Box::new(sink));
2283
2284        Ok(())
2285    }
2286
2287    /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2288    ///
2289    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2290    /// default back to `buffered` mode, in order not to break the user's terminal.
2291    ///
2292    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2293    /// terms of data durability and ordering.
2294    /// See [`Self::set_sink`] for more information.
2295    pub fn stdout(&self) -> Result<(), crate::sink::FileSinkError> {
2296        self.stdout_opts()
2297    }
2298
2299    /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2300    ///
2301    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2302    /// default back to `buffered` mode, in order not to break the user's terminal.
2303    ///
2304    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2305    /// terms of data durability and ordering.
2306    /// See [`Self::set_sink`] for more information.
2307    ///
2308    /// If a blueprint was provided, it will be stored first in the file.
2309    /// Blueprints are currently an experimental part of the Rust SDK.
2310    pub fn stdout_opts(&self) -> Result<(), crate::sink::FileSinkError> {
2311        if forced_sink_path().is_some() {
2312            re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2313            return Ok(());
2314        }
2315
2316        if std::io::stdout().is_terminal() {
2317            re_log::debug!("Ignored call to stdout() because stdout is a terminal");
2318            self.set_sink(Box::new(crate::log_sink::BufferedSink::new()));
2319            return Ok(());
2320        }
2321
2322        let sink = crate::sink::FileSink::stdout()?;
2323
2324        self.set_sink(Box::new(sink));
2325
2326        Ok(())
2327    }
2328
2329    /// Swaps the underlying sink for a [`crate::sink::BufferedSink`].
2330    ///
2331    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2332    /// terms of data durability and ordering.
2333    /// See [`Self::set_sink`] for more information.
2334    pub fn disconnect(&self) {
2335        let f = move |inner: &RecordingStreamInner| {
2336            // When disconnecting, we need to make sure that pending top-level importer threads that
2337            // were started from the SDK run to completion.
2338            inner.wait_for_importers();
2339            self.set_sink(Box::new(crate::sink::BufferedSink::new()));
2340        };
2341
2342        if self.with(f).is_none() {
2343            re_log::warn_once!("Recording disabled - call to disconnect() ignored");
2344        }
2345    }
2346
2347    /// Finalize any sinks whose on-disk format only completes at shutdown (i.e. file-like sinks
2348    /// that write a footer at the end), while leaving streaming sinks (e.g. gRPC) intact.
2349    ///
2350    /// For a bare deferring sink (e.g. `FileSink` from `save()`), this is equivalent to
2351    /// [`Self::disconnect`]: the sink is replaced with a [`crate::sink::BufferedSink`] and its
2352    /// `Drop` impl runs the writer thread to completion, which is what emits the footer.
2353    ///
2354    /// For a [`crate::log_sink::MultiSink`] containing a mix of deferring and streaming children,
2355    /// only the deferring children are dropped. The streaming children remain live and the
2356    /// `MultiSink` continues to receive new messages.
2357    ///
2358    /// For all other sinks this is a no-op.
2359    ///
2360    /// Used by Python's `RecordingStream.__exit__` so file-backed recordings are consumable as
2361    /// soon as the `with`-block exits, without waiting for `__del__` / GC.
2362    pub fn finalize_deferred_sinks(&self) {
2363        let timeout = Duration::MAX;
2364        let f = move |inner: &RecordingStreamInner| {
2365            inner.wait_for_importers();
2366
2367            // Flush the batcher down the chunk channel so any pending data lands in the sink
2368            // before we tear it down.
2369            if let Err(err) = inner.batcher.flush_blocking(timeout) {
2370                re_log::warn!("Failed to flush batcher in `finalize_deferred_sinks`: {err}");
2371            }
2372            inner.cmds_tx.send(Command::PopPendingChunks).ok();
2373
2374            let (cmd, oneshot) = Command::finalize_deferred_sinks(timeout);
2375            inner.cmds_tx.send(cmd).ok();
2376            oneshot.recv().ok();
2377        };
2378
2379        if self.with(f).is_none() {
2380            re_log::warn_once!("Recording disabled - call to finalize_deferred_sinks() ignored");
2381        }
2382    }
2383
2384    /// Send a blueprint through this recording stream.
2385    pub fn send_blueprint(
2386        &self,
2387        blueprint: Vec<LogMsg>,
2388        activation_cmd: BlueprintActivationCommand,
2389    ) {
2390        let mut blueprint_id = None;
2391        for msg in blueprint {
2392            if blueprint_id.is_none() {
2393                blueprint_id = Some(msg.store_id().clone());
2394            }
2395            self.record_msg(msg);
2396        }
2397
2398        if let Some(blueprint_id) = blueprint_id {
2399            if blueprint_id == activation_cmd.blueprint_id {
2400                // Let the viewer know that the blueprint has been fully received,
2401                // and that it can now be activated.
2402                // We don't want to activate half-loaded blueprints, because that can be confusing,
2403                // and can also lead to problems with view heuristics.
2404                self.record_msg(activation_cmd.into());
2405            } else {
2406                re_log::warn!(
2407                    "Blueprint ID mismatch when sending blueprint: {:?} != {:?}. Ignoring activation.",
2408                    blueprint_id,
2409                    activation_cmd.blueprint_id
2410                );
2411            }
2412        }
2413    }
2414
2415    /// Send a [`crate::blueprint::Blueprint`] to configure the viewer layout.
2416    pub fn send_blueprint_opts(
2417        &self,
2418        opts: &crate::blueprint::BlueprintOpts,
2419    ) -> RecordingStreamResult<()> {
2420        opts.blueprint.send(self, opts.activation)
2421    }
2422}
2423
2424impl fmt::Debug for RecordingStream {
2425    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2426        let with = |inner: &RecordingStreamInner| {
2427            let RecordingStreamInner {
2428                // This pattern match prevents _accidentally_ omitting data from the debug output
2429                // when new fields are added.
2430                store_info,
2431                recording_info,
2432                tick,
2433                cmds_tx: _,
2434                batcher: _,
2435                batcher_to_sink_handle: _,
2436                current_batcher_config,
2437                sink_dependent_batcher_config,
2438                importer_handles,
2439                pid_at_creation,
2440            } = inner;
2441
2442            f.debug_struct("RecordingStream")
2443                .field("store_info", &store_info)
2444                .field("recording_info", &recording_info)
2445                .field("tick", &tick)
2446                .field("current_batcher_config", &*current_batcher_config.lock())
2447                .field(
2448                    "sink_dependent_batcher_config",
2449                    &sink_dependent_batcher_config,
2450                )
2451                .field("pending_importers", &importer_handles.lock().len())
2452                .field("pid_at_creation", &pid_at_creation)
2453                .finish_non_exhaustive()
2454        };
2455
2456        match self.with(with) {
2457            Some(res) => res,
2458            None => write!(f, "RecordingStream {{ disabled }}"),
2459        }
2460    }
2461}
2462
2463// --- Stateful time ---
2464
2465/// Thread-local data.
2466#[derive(Default)]
2467struct ThreadInfo {
2468    /// The current time per-thread per-recording, which can be set by users.
2469    timepoints: HashMap<StoreId, TimePoint>,
2470}
2471
2472impl ThreadInfo {
2473    fn thread_now(rid: &StoreId) -> TimePoint {
2474        Self::with(|ti| ti.now(rid))
2475    }
2476
2477    fn set_thread_time(rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2478        Self::with(|ti| ti.set_time(rid, timeline, cell));
2479    }
2480
2481    fn unset_thread_time(rid: &StoreId, timeline: &TimelineName) {
2482        Self::with(|ti| ti.unset_time(rid, timeline));
2483    }
2484
2485    fn reset_thread_time(rid: &StoreId) {
2486        Self::with(|ti| ti.reset_time(rid));
2487    }
2488
2489    /// Get access to the thread-local [`ThreadInfo`].
2490    fn with<R>(f: impl FnOnce(&mut Self) -> R) -> R {
2491        use std::cell::RefCell;
2492        thread_local! {
2493            static THREAD_INFO: RefCell<Option<ThreadInfo>> = const { RefCell::new(None) };
2494        }
2495
2496        THREAD_INFO.with(|thread_info| {
2497            let mut thread_info = thread_info.borrow_mut();
2498            let thread_info = thread_info.get_or_insert_with(Self::default);
2499            f(thread_info)
2500        })
2501    }
2502
2503    fn now(&self, rid: &StoreId) -> TimePoint {
2504        let mut timepoint = self.timepoints.get(rid).cloned().unwrap_or_default();
2505        timepoint.insert_cell(TimelineName::log_time(), TimeCell::timestamp_now());
2506        timepoint
2507    }
2508
2509    fn set_time(&mut self, rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2510        self.timepoints
2511            .entry(rid.clone())
2512            .or_default()
2513            .insert_cell(timeline, cell);
2514    }
2515
2516    fn unset_time(&mut self, rid: &StoreId, timeline: &TimelineName) {
2517        if let Some(timepoint) = self.timepoints.get_mut(rid) {
2518            timepoint.remove(timeline);
2519        }
2520    }
2521
2522    fn reset_time(&mut self, rid: &StoreId) {
2523        if let Some(timepoint) = self.timepoints.get_mut(rid) {
2524            *timepoint = TimePoint::default();
2525        }
2526    }
2527}
2528
2529impl RecordingStream {
2530    /// Returns the current time of the recording on the current thread.
2531    pub fn now(&self) -> TimePoint {
2532        let f =
2533            move |inner: &RecordingStreamInner| ThreadInfo::thread_now(&inner.store_info.store_id);
2534        if let Some(res) = self.with(f) {
2535            res
2536        } else {
2537            re_log::warn_once!("Recording disabled - call to now() ignored");
2538            TimePoint::default()
2539        }
2540    }
2541
2542    /// Set the current time of the recording, for the current calling thread.
2543    ///
2544    /// Used for all subsequent logging performed from this same thread, until the next call
2545    /// to one of the index/time setting methods.
2546    ///
2547    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2548    ///
2549    /// See also:
2550    /// - [`Self::set_time`]
2551    /// - [`Self::set_time_sequence`]
2552    /// - [`Self::set_duration_secs`]
2553    /// - [`Self::disable_timeline`]
2554    /// - [`Self::reset_time`]
2555    pub fn set_timepoint(&self, timepoint: impl Into<TimePoint>) {
2556        let f = move |inner: &RecordingStreamInner| {
2557            let timepoint = timepoint.into();
2558            for (timeline, time) in timepoint {
2559                ThreadInfo::set_thread_time(&inner.store_info.store_id, timeline, time);
2560            }
2561        };
2562
2563        if self.with(f).is_none() {
2564            re_log::warn_once!("Recording disabled - call to set_timepoint() ignored");
2565        }
2566    }
2567
2568    /// Set the current value of one of the timelines.
2569    ///
2570    /// Used for all subsequent logging performed from this same thread, until the next call
2571    /// to one of the index/time setting methods.
2572    ///
2573    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2574    ///
2575    /// Example:
2576    /// ```no_run
2577    /// # mod rerun { pub use re_sdk::*; }
2578    /// # let rec: rerun::RecordingStream = unimplemented!();
2579    /// rec.set_time("frame_nr", rerun::TimeCell::from_sequence(42));
2580    /// rec.set_time("duration", std::time::Duration::from_millis(123));
2581    /// rec.set_time("capture_time", std::time::SystemTime::now());
2582    /// ```
2583    ///
2584    /// See also:
2585    /// - [`Self::set_timepoint`]
2586    /// - [`Self::set_time_sequence`]
2587    /// - [`Self::set_duration_secs`]
2588    /// - [`Self::disable_timeline`]
2589    /// - [`Self::reset_time`]
2590    pub fn set_time(&self, timeline: impl Into<TimelineName>, value: impl TryInto<TimeCell>) {
2591        let f = move |inner: &RecordingStreamInner| {
2592            let timeline = timeline.into();
2593            if let Ok(value) = value.try_into() {
2594                ThreadInfo::set_thread_time(&inner.store_info.store_id, timeline, value);
2595            } else {
2596                re_log::warn_once!(
2597                    "set_time({timeline}): Failed to convert the given value to an TimeCell"
2598                );
2599            }
2600        };
2601
2602        if self.with(f).is_none() {
2603            re_log::warn_once!("Recording disabled - call to set_time() ignored");
2604        }
2605    }
2606
2607    /// Set the current time of the recording, for the current calling thread.
2608    ///
2609    /// Short for `set_time(timeline, rerun::TimeCell::from_sequence(sequence))`.
2610    ///
2611    /// Used for all subsequent logging performed from this same thread, until the next call
2612    /// to one of the index/time setting methods.
2613    ///
2614    /// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
2615    /// You can remove a timeline again using `rec.disable_timeline("frame_nr")`.
2616    ///
2617    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2618    ///
2619    /// See also:
2620    /// - [`Self::set_time`]
2621    /// - [`Self::set_timepoint`]
2622    /// - [`Self::set_duration_secs`]
2623    /// - [`Self::disable_timeline`]
2624    /// - [`Self::reset_time`]
2625    #[inline]
2626    pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: impl Into<i64>) {
2627        self.set_time(timeline, TimeCell::from_sequence(sequence.into()));
2628    }
2629
2630    /// Set the current time of the recording, for the current calling thread.
2631    ///
2632    /// Short for `set_time(timeline, std::time::Duration::from_secs_f64(secs))`..
2633    ///
2634    /// Used for all subsequent logging performed from this same thread, until the next call
2635    /// to one of the index/time setting methods.
2636    ///
2637    /// For example: `rec.set_duration_secs("time_since_start", time_offset)`.
2638    /// You can remove a timeline again using `rec.disable_timeline("time_since_start")`.
2639    ///
2640    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2641    ///
2642    /// See also:
2643    /// - [`Self::set_time`]
2644    /// - [`Self::set_timepoint`]
2645    /// - [`Self::set_timestamp_secs_since_epoch`]
2646    /// - [`Self::set_time_sequence`]
2647    /// - [`Self::disable_timeline`]
2648    /// - [`Self::reset_time`]
2649    #[inline]
2650    pub fn set_duration_secs(&self, timeline: impl Into<TimelineName>, secs: impl Into<f64>) {
2651        let secs = secs.into();
2652        if let Ok(duration) = std::time::Duration::try_from_secs_f64(secs) {
2653            self.set_time(timeline, duration);
2654        } else {
2655            re_log::error_once!("set_duration_secs: can't set time to {secs}");
2656        }
2657    }
2658
2659    /// Set a timestamp as seconds since Unix epoch (1970-01-01 00:00:00 UTC).
2660    ///
2661    /// Short for `self.set_time(timeline, rerun::TimeCell::from_timestamp_secs_since_epoch(secs))`.
2662    ///
2663    /// Used for all subsequent logging performed from this same thread, until the next call
2664    /// to one of the index/time setting methods.
2665    ///
2666    /// You can remove a timeline again using `rec.disable_timeline(timeline)`.
2667    ///
2668    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2669    ///
2670    /// See also:
2671    /// - [`Self::set_time`]
2672    /// - [`Self::set_timepoint`]
2673    /// - [`Self::set_duration_secs`]
2674    /// - [`Self::set_time_sequence`]
2675    /// - [`Self::set_timestamp_nanos_since_epoch`]
2676    /// - [`Self::disable_timeline`]
2677    /// - [`Self::reset_time`]
2678    #[inline]
2679    pub fn set_timestamp_secs_since_epoch(
2680        &self,
2681        timeline: impl Into<TimelineName>,
2682        secs: impl Into<f64>,
2683    ) {
2684        self.set_time(
2685            timeline,
2686            TimeCell::from_timestamp_secs_since_epoch(secs.into()),
2687        );
2688    }
2689
2690    /// Set a timestamp as nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
2691    ///
2692    /// Short for `self.set_time(timeline, rerun::TimeCell::set_timestamp_nanos_since_epoch(secs))`.
2693    ///
2694    /// Used for all subsequent logging performed from this same thread, until the next call
2695    /// to one of the index/time setting methods.
2696    ///
2697    /// You can remove a timeline again using `rec.disable_timeline(timeline)`.
2698    ///
2699    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2700    ///
2701    /// See also:
2702    /// - [`Self::set_time`]
2703    /// - [`Self::set_timepoint`]
2704    /// - [`Self::set_duration_secs`]
2705    /// - [`Self::set_time_sequence`]
2706    /// - [`Self::set_timestamp_secs_since_epoch`]
2707    /// - [`Self::disable_timeline`]
2708    /// - [`Self::reset_time`]
2709    #[inline]
2710    pub fn set_timestamp_nanos_since_epoch(
2711        &self,
2712        timeline: impl Into<TimelineName>,
2713        nanos: impl Into<i64>,
2714    ) {
2715        self.set_time(
2716            timeline,
2717            TimeCell::from_timestamp_nanos_since_epoch(nanos.into()),
2718        );
2719    }
2720
2721    /// Clears out the current time of the recording for the specified timeline, for the
2722    /// current calling thread.
2723    ///
2724    /// For example: `rec.disable_timeline("frame")`, `rec.disable_timeline("sim_time")`.
2725    ///
2726    /// See also:
2727    /// - [`Self::set_timepoint`]
2728    /// - [`Self::set_time_sequence`]
2729    /// - [`Self::reset_time`]
2730    pub fn disable_timeline(&self, timeline: impl Into<TimelineName>) {
2731        let f = move |inner: &RecordingStreamInner| {
2732            let timeline = timeline.into();
2733            ThreadInfo::unset_thread_time(&inner.store_info.store_id, &timeline);
2734        };
2735
2736        if self.with(f).is_none() {
2737            re_log::warn_once!("Recording disabled - call to disable_timeline() ignored");
2738        }
2739    }
2740
2741    /// Clears out the current time of the recording, for the current calling thread.
2742    ///
2743    /// Used for all subsequent logging performed from this same thread, until the next call
2744    /// to one of the index/time setting methods.
2745    ///
2746    /// For example: `rec.reset_time()`.
2747    ///
2748    /// See also:
2749    /// - [`Self::set_timepoint`]
2750    /// - [`Self::set_time_sequence`]
2751    /// - [`Self::disable_timeline`]
2752    pub fn reset_time(&self) {
2753        let f = move |inner: &RecordingStreamInner| {
2754            ThreadInfo::reset_thread_time(&inner.store_info.store_id);
2755        };
2756
2757        if self.with(f).is_none() {
2758            re_log::warn_once!("Recording disabled - call to reset_time() ignored");
2759        }
2760    }
2761}
2762
2763// ---
2764
2765#[cfg(test)]
2766mod tests {
2767    use insta::assert_debug_snapshot;
2768    use itertools::Itertools as _;
2769    use re_log_types::example_components::{MyLabel, MyPoints};
2770    use re_sdk_types::SerializedComponentBatch;
2771
2772    use super::*;
2773
2774    struct DisplayDescrs(Chunk);
2775
2776    impl std::fmt::Debug for DisplayDescrs {
2777        #[inline]
2778        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2779            f.debug_list()
2780                .entries(
2781                    self.0
2782                        .component_descriptors()
2783                        .map(|d| d.display_name().to_owned())
2784                        .sorted(),
2785                )
2786                .finish()
2787        }
2788    }
2789
2790    #[test]
2791    fn impl_send_sync() {
2792        fn assert_send_sync<T: Send + Sync>() {}
2793        assert_send_sync::<RecordingStream>();
2794    }
2795
2796    #[test]
2797    fn never_flush() {
2798        let rec = RecordingStreamBuilder::new("rerun_example_never_flush")
2799            .enabled(true)
2800            .batcher_config(ChunkBatcherConfig::NEVER)
2801            .buffered()
2802            .unwrap();
2803
2804        let store_info = rec.store_info().unwrap();
2805
2806        let rows = example_rows(false);
2807        for row in rows.clone() {
2808            rec.record_row("a".into(), row, false);
2809        }
2810
2811        let storage = rec.memory();
2812        let mut msgs = {
2813            let mut msgs = storage.take();
2814            msgs.reverse();
2815            msgs
2816        };
2817
2818        // First message should be a set_store_info resulting from the original sink swap to
2819        // buffered mode.
2820        match msgs.pop().unwrap() {
2821            LogMsg::SetStoreInfo(msg) => {
2822                assert!(msg.row_id != *RowId::ZERO);
2823                similar_asserts::assert_eq!(store_info, msg.info);
2824            }
2825            _ => panic!("expected SetStoreInfo"),
2826        }
2827
2828        // Second message should be a set_store_info resulting from the later sink swap from
2829        // buffered mode into in-memory mode.
2830        // This arrives _before_ the data itself since we're using manual flushing.
2831        match msgs.pop().unwrap() {
2832            LogMsg::SetStoreInfo(msg) => {
2833                assert!(msg.row_id != *RowId::ZERO);
2834                similar_asserts::assert_eq!(store_info, msg.info);
2835            }
2836            _ => panic!("expected SetStoreInfo"),
2837        }
2838
2839        // The following flushes were sent as a result of the implicit flush when swapping the
2840        // underlying sink from buffered to in-memory.
2841
2842        // Chunk that contains the `RecordingInfo`.
2843        match msgs.pop().unwrap() {
2844            LogMsg::ArrowMsg(rid, msg) => {
2845                assert_eq!(store_info.store_id, rid);
2846
2847                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2848
2849                chunk.sanity_check().unwrap();
2850
2851                assert_debug_snapshot!(DisplayDescrs(chunk));
2852            }
2853            _ => panic!("expected ArrowMsg"),
2854        }
2855
2856        // Final message is the batched chunk itself.
2857        match msgs.pop().unwrap() {
2858            LogMsg::ArrowMsg(rid, msg) => {
2859                assert_eq!(store_info.store_id, rid);
2860
2861                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2862
2863                chunk.sanity_check().unwrap();
2864
2865                assert_debug_snapshot!(DisplayDescrs(chunk));
2866            }
2867            _ => panic!("expected ArrowMsg"),
2868        }
2869
2870        // That's all.
2871        assert!(msgs.pop().is_none());
2872    }
2873
2874    #[test]
2875    fn always_flush() {
2876        let rec = RecordingStreamBuilder::new("rerun_example_always_flush")
2877            .enabled(true)
2878            .batcher_config(ChunkBatcherConfig::ALWAYS_TEST_ONLY)
2879            .buffered()
2880            .unwrap();
2881
2882        let store_info = rec.store_info().unwrap();
2883
2884        let rows = example_rows(false);
2885        for row in rows.clone() {
2886            rec.record_row("a".into(), row, false);
2887        }
2888
2889        let storage = rec.memory();
2890        let mut msgs = {
2891            let mut msgs = storage.take();
2892            msgs.reverse();
2893            msgs
2894        };
2895
2896        // First message should be a set_store_info resulting from the original sink swap to
2897        // buffered mode.
2898        match msgs.pop().unwrap() {
2899            LogMsg::SetStoreInfo(msg) => {
2900                assert!(msg.row_id != *RowId::ZERO);
2901                similar_asserts::assert_eq!(store_info, msg.info);
2902            }
2903            _ => panic!("expected SetStoreInfo"),
2904        }
2905
2906        // Second message should be a set_store_info resulting from the later sink swap from
2907        // buffered mode into in-memory mode.
2908        // This arrives _before_ the data itself since we're using manual flushing.
2909        match msgs.pop().unwrap() {
2910            LogMsg::SetStoreInfo(msg) => {
2911                assert!(msg.row_id != *RowId::ZERO);
2912                similar_asserts::assert_eq!(store_info, msg.info);
2913            }
2914            _ => panic!("expected SetStoreInfo"),
2915        }
2916
2917        let mut assert_next_row = || match msgs.pop().unwrap() {
2918            LogMsg::ArrowMsg(rid, msg) => {
2919                assert_eq!(store_info.store_id, rid);
2920
2921                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2922
2923                chunk.sanity_check().unwrap();
2924
2925                assert_debug_snapshot!(DisplayDescrs(chunk));
2926            }
2927            _ => panic!("expected ArrowMsg"),
2928        };
2929
2930        // 3rd, 4th, 5th, and 6th messages are all the single-row batched chunks themselves,
2931        // which were sent as a result of the implicit flush when swapping the underlying sink
2932        // from buffered to in-memory. Note that these messages contain the 2 recording property
2933        // chunks.
2934        assert_next_row(); // Contains `RecordingInfo`
2935        assert_next_row();
2936        assert_next_row();
2937        assert_next_row();
2938
2939        // That's all.
2940        assert!(msgs.pop().is_none());
2941    }
2942
2943    #[test]
2944    fn flush_hierarchy() {
2945        let (rec, storage) = RecordingStreamBuilder::new("rerun_example_flush_hierarchy")
2946            .enabled(true)
2947            .batcher_config(ChunkBatcherConfig::NEVER)
2948            .memory()
2949            .unwrap();
2950
2951        let store_info = rec.store_info().unwrap();
2952
2953        let rows = example_rows(false);
2954        for row in rows.clone() {
2955            rec.record_row("a".into(), row, false);
2956        }
2957
2958        {
2959            let mut msgs = {
2960                let mut msgs = storage.take();
2961                msgs.reverse();
2962                msgs
2963            };
2964
2965            // First message should be a set_store_info resulting from the original sink swap
2966            // to in-memory mode.
2967            match msgs.pop().unwrap() {
2968                LogMsg::SetStoreInfo(msg) => {
2969                    assert!(msg.row_id != *RowId::ZERO);
2970                    similar_asserts::assert_eq!(store_info, msg.info);
2971                }
2972                _ => panic!("expected SetStoreInfo"),
2973            }
2974
2975            // For reasons, MemorySink ends up with 2 StoreInfos.
2976            // TODO(jleibs): Avoid a redundant StoreInfo message.
2977            match msgs.pop().unwrap() {
2978                LogMsg::SetStoreInfo(msg) => {
2979                    assert!(msg.row_id != *RowId::ZERO);
2980                    similar_asserts::assert_eq!(store_info, msg.info);
2981                }
2982                _ => panic!("expected SetStoreInfo"),
2983            }
2984
2985            // MemorySinkStorage transparently handles flushing during `take()`!
2986
2987            // The batch that contains the `RecordingInfo`.
2988            match msgs.pop().unwrap() {
2989                LogMsg::ArrowMsg(rid, msg) => {
2990                    assert_eq!(store_info.store_id, rid);
2991
2992                    let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2993
2994                    chunk.sanity_check().unwrap();
2995
2996                    assert_debug_snapshot!(DisplayDescrs(chunk));
2997                }
2998                _ => panic!("expected ArrowMsg"),
2999            }
3000
3001            // The batched chunk itself, which was sent as a result of the explicit flush above.
3002            match msgs.pop().unwrap() {
3003                LogMsg::ArrowMsg(rid, msg) => {
3004                    assert_eq!(store_info.store_id, rid);
3005
3006                    let chunk = Chunk::from_arrow_msg(&msg).unwrap();
3007
3008                    chunk.sanity_check().unwrap();
3009
3010                    assert_debug_snapshot!(DisplayDescrs(chunk));
3011                }
3012                _ => panic!("expected ArrowMsg"),
3013            }
3014
3015            // That's all.
3016            assert!(msgs.pop().is_none());
3017        }
3018    }
3019
3020    #[test]
3021    fn disabled() {
3022        let (rec, storage) = RecordingStreamBuilder::new("rerun_example_disabled")
3023            .enabled(false)
3024            .batcher_config(ChunkBatcherConfig::ALWAYS_TEST_ONLY)
3025            .memory()
3026            .unwrap();
3027
3028        assert_eq!(rec.ref_count(), 0);
3029
3030        let rows = example_rows(false);
3031        for row in rows.clone() {
3032            rec.record_row("a".into(), row, false);
3033        }
3034
3035        let mut msgs = {
3036            let mut msgs = storage.take();
3037            msgs.reverse();
3038            msgs
3039        };
3040
3041        // That's all.
3042        assert!(msgs.pop().is_none());
3043    }
3044
3045    #[test]
3046    fn test_set_thread_local() {
3047        // Regression-test for https://github.com/rerun-io/rerun/issues/2889
3048        std::thread::Builder::new()
3049            .name("test_thead".to_owned())
3050            .spawn(|| {
3051                let stream = RecordingStreamBuilder::new("rerun_example_test")
3052                    .buffered()
3053                    .unwrap();
3054                RecordingStream::set_thread_local(StoreKind::Recording, Some(stream));
3055            })
3056            .unwrap()
3057            .join()
3058            .unwrap();
3059    }
3060
3061    fn example_rows(static_: bool) -> Vec<PendingRow> {
3062        use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
3063        use re_sdk_types::Loggable;
3064
3065        let mut tick = 0i64;
3066        let mut timepoint = |frame_nr: i64| {
3067            let mut tp = TimePoint::default();
3068            if !static_ {
3069                tp.insert(Timeline::log_time(), re_log_types::Timestamp::now());
3070                tp.insert(Timeline::log_tick(), tick);
3071                tp.insert(Timeline::new_sequence("frame_nr"), frame_nr);
3072            }
3073            tick += 1;
3074            tp
3075        };
3076
3077        let row0 = {
3078            PendingRow {
3079                row_id: RowId::new(),
3080                timepoint: timepoint(1),
3081                components: [
3082                    (
3083                        MyPoints::descriptor_points().component,
3084                        SerializedComponentBatch::new(
3085                            <MyPoint as Loggable>::to_arrow([
3086                                MyPoint::new(10.0, 10.0),
3087                                MyPoint::new(20.0, 20.0),
3088                            ])
3089                            .unwrap(),
3090                            MyPoints::descriptor_points(),
3091                        ),
3092                    ), //
3093                    (
3094                        MyPoints::descriptor_colors().component,
3095                        SerializedComponentBatch::new(
3096                            <MyColor as Loggable>::to_arrow([MyColor(0x8080_80FF)]).unwrap(),
3097                            MyPoints::descriptor_colors(),
3098                        ),
3099                    ), //
3100                    (
3101                        MyPoints::descriptor_labels().component,
3102                        SerializedComponentBatch::new(
3103                            <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
3104                            MyPoints::descriptor_labels(),
3105                        ),
3106                    ), //
3107                ]
3108                .into_iter()
3109                .collect(),
3110            }
3111        };
3112
3113        let row1 = {
3114            PendingRow {
3115                row_id: RowId::new(),
3116                timepoint: timepoint(1),
3117                components: [
3118                    (
3119                        MyPoints::descriptor_points().component,
3120                        SerializedComponentBatch::new(
3121                            <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
3122                            MyPoints::descriptor_points(),
3123                        ),
3124                    ), //
3125                    (
3126                        MyPoints::descriptor_colors().component,
3127                        SerializedComponentBatch::new(
3128                            <MyColor as Loggable>::to_arrow([] as [MyColor; 0]).unwrap(),
3129                            MyPoints::descriptor_colors(),
3130                        ),
3131                    ), //
3132                    (
3133                        MyPoints::descriptor_labels().component,
3134                        SerializedComponentBatch::new(
3135                            <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
3136                            MyPoints::descriptor_labels(),
3137                        ),
3138                    ), //
3139                ]
3140                .into_iter()
3141                .collect(),
3142            }
3143        };
3144
3145        let row2 = {
3146            PendingRow {
3147                row_id: RowId::new(),
3148                timepoint: timepoint(1),
3149                components: [
3150                    (
3151                        MyPoints::descriptor_points().component,
3152                        SerializedComponentBatch::new(
3153                            <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
3154                            MyPoints::descriptor_points(),
3155                        ),
3156                    ), //
3157                    (
3158                        MyPoints::descriptor_colors().component,
3159                        SerializedComponentBatch::new(
3160                            <MyColor as Loggable>::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(),
3161                            MyPoints::descriptor_colors(),
3162                        ),
3163                    ), //
3164                    (
3165                        MyPoints::descriptor_labels().component,
3166                        SerializedComponentBatch::new(
3167                            <MyLabel as Loggable>::to_arrow([MyLabel("hey".into())]).unwrap(),
3168                            MyPoints::descriptor_labels(),
3169                        ),
3170                    ), //
3171                ]
3172                .into_iter()
3173                .collect(),
3174            }
3175        };
3176
3177        vec![row0, row1, row2]
3178    }
3179
3180    // See <https://github.com/rerun-io/rerun/pull/8587> for context.
3181    #[test]
3182    fn allows_componentbatch_unsized() {
3183        let labels = [
3184            MyLabel("a".into()),
3185            MyLabel("b".into()),
3186            MyLabel("c".into()),
3187        ];
3188
3189        let (rec, _mem) = RecordingStreamBuilder::new("rerun_example_test_componentbatch_unsized")
3190            .default_enabled(false)
3191            .enabled(false)
3192            .memory()
3193            .unwrap();
3194
3195        // This call used to *not* compile due to a lack of `?Sized` bounds.
3196        use re_sdk_types::ComponentBatch as _;
3197        rec.log(
3198            "labels",
3199            &labels
3200                .try_serialized(MyPoints::descriptor_labels())
3201                .unwrap(),
3202        )
3203        .unwrap();
3204    }
3205
3206    struct BatcherConfigTestSink {
3207        config: ChunkBatcherConfig,
3208    }
3209
3210    impl LogSink for BatcherConfigTestSink {
3211        fn default_batcher_config(&self) -> ChunkBatcherConfig {
3212            self.config
3213        }
3214
3215        fn send(&self, _msg: LogMsg) {
3216            // noop
3217        }
3218
3219        fn flush_blocking(&self, _timeout: Duration) -> Result<(), SinkFlushError> {
3220            Ok(())
3221        }
3222    }
3223
3224    struct ScopedEnvVarSet {
3225        key: &'static str,
3226    }
3227
3228    impl ScopedEnvVarSet {
3229        #[expect(unsafe_code)]
3230        fn new(key: &'static str, value: &'static str) -> Self {
3231            // SAFETY: only used in tests.
3232            unsafe { std::env::set_var(key, value) };
3233            Self { key }
3234        }
3235    }
3236
3237    impl Drop for ScopedEnvVarSet {
3238        #[expect(unsafe_code)]
3239        fn drop(&mut self) {
3240            // SAFETY: only used in tests.
3241            unsafe {
3242                std::env::remove_var(self.key);
3243            }
3244        }
3245    }
3246
3247    const CONFIG_CHANGE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
3248
3249    fn clear_environment() {
3250        // SAFETY: only used in tests.
3251        #[expect(unsafe_code)]
3252        unsafe {
3253            std::env::remove_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED");
3254            std::env::remove_var("RERUN_FLUSH_NUM_BYTES");
3255            std::env::remove_var("RERUN_FLUSH_NUM_ROWS");
3256            std::env::remove_var("RERUN_FLUSH_TICK_SECS");
3257            std::env::remove_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED");
3258        }
3259    }
3260
3261    #[test]
3262    fn test_sink_dependent_batcher_config() {
3263        clear_environment();
3264
3265        let (tx, rx) = crossbeam::channel::bounded(16);
3266
3267        let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3268            .batcher_hooks(BatcherHooks {
3269                on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3270                    re_quota_channel::send_crossbeam(&tx, *config).unwrap();
3271                })),
3272                ..BatcherHooks::NONE
3273            })
3274            .buffered()
3275            .unwrap();
3276
3277        let new_config = rx
3278            .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3279            .expect("no config change message received within timeout");
3280        assert_eq!(
3281            new_config,
3282            ChunkBatcherConfig::from_env().unwrap(),
3283            "Buffered sink should uses the config from the environment"
3284        );
3285
3286        // Change sink to our custom sink. Will it take over the setting?
3287        let injected_config = ChunkBatcherConfig {
3288            flush_tick: std::time::Duration::from_secs(123),
3289            flush_num_bytes: 123,
3290            flush_num_rows: 123,
3291            ..new_config
3292        };
3293        rec.set_sink(Box::new(BatcherConfigTestSink {
3294            config: injected_config,
3295        }));
3296        let new_config = rx
3297            .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3298            .expect("no config change message received within timeout");
3299
3300        assert_eq!(new_config, injected_config);
3301
3302        // Set flush num bytes through env var and set the sink again.
3303        // check that the env var is respected.
3304        let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_NUM_BYTES", "456");
3305        rec.set_sink(Box::new(BatcherConfigTestSink {
3306            config: injected_config,
3307        }));
3308        let new_config = rx
3309            .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3310            .expect("no config change message received within timeout");
3311        assert_eq!(
3312            new_config,
3313            ChunkBatcherConfig {
3314                flush_num_bytes: 456,
3315                ..injected_config
3316            },
3317        );
3318    }
3319
3320    #[test]
3321    fn test_explicit_batcher_config() {
3322        clear_environment();
3323
3324        // This environment variable should *not* override the explicit config.
3325        let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_TICK_SECS", "456");
3326        let explicit_config = ChunkBatcherConfig {
3327            flush_tick: std::time::Duration::from_secs(123),
3328            flush_num_bytes: 123,
3329            flush_num_rows: 123,
3330            ..ChunkBatcherConfig::DEFAULT
3331        };
3332
3333        let (tx, rx) = crossbeam::channel::bounded(16);
3334        let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3335            .batcher_config(explicit_config)
3336            .batcher_hooks(BatcherHooks {
3337                on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3338                    re_quota_channel::send_crossbeam(&tx, *config).unwrap();
3339                })),
3340                ..BatcherHooks::NONE
3341            })
3342            .buffered()
3343            .unwrap();
3344
3345        let new_config = rx
3346            .recv_timeout(CONFIG_CHANGE_TIMEOUT)
3347            .expect("no config change message received within timeout");
3348        assert_eq!(new_config, explicit_config);
3349
3350        // Changing the sink should have no effect since an explicit config is in place.
3351        rec.set_sink(Box::new(BatcherConfigTestSink {
3352            config: ChunkBatcherConfig::ALWAYS_TEST_ONLY,
3353        }));
3354        // Don't want to stall the test for CONFIG_CHANGE_TIMEOUT here.
3355        let new_config_recv_result = rx.recv_timeout(std::time::Duration::from_millis(100));
3356        assert_eq!(
3357            new_config_recv_result,
3358            Err(crossbeam::channel::RecvTimeoutError::Timeout)
3359        );
3360    }
3361}