re_sdk/
recording_stream.rs

1use std::fmt;
2use std::io::IsTerminal as _;
3use std::sync::Weak;
4use std::sync::{atomic::AtomicI64, Arc};
5use std::time::Duration;
6
7use ahash::HashMap;
8use crossbeam::channel::{Receiver, Sender};
9use itertools::Either;
10use nohash_hasher::IntMap;
11use parking_lot::Mutex;
12
13use re_chunk::{
14    Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError, ChunkComponents, ChunkError,
15    ChunkId, PendingRow, RowId, TimeColumn,
16};
17use re_log_types::{
18    ApplicationId, ArrowRecordBatchReleaseCallback, BlueprintActivationCommand, EntityPath, LogMsg,
19    StoreId, StoreInfo, StoreKind, StoreSource, TimeCell, TimeInt, TimePoint, Timeline,
20    TimelineName,
21};
22use re_types::archetypes::RecordingProperties;
23use re_types::components::Timestamp;
24use re_types::{AsComponents, SerializationError, SerializedComponentColumn};
25
26#[cfg(feature = "web_viewer")]
27use re_web_viewer_server::WebViewerServerPort;
28
29use crate::binary_stream_sink::BinaryStreamStorage;
30use crate::sink::{LogSink, MemorySinkStorage};
31
32// ---
33
34/// Private environment variable meant for tests.
35///
36/// When set, all recording streams will write to disk at the path indicated by the env-var rather
37/// than doing what they were asked to do - `connect()`, `buffered()`, even `save()` will re-use the same sink.
38const ENV_FORCE_SAVE: &str = "_RERUN_TEST_FORCE_SAVE";
39
40/// Returns path for force sink if private environment variable `_RERUN_TEST_FORCE_SAVE` is set
41///
42/// Newly created [`RecordingStream`]s should use a [`crate::sink::FileSink`] pointing to this path.
43/// Furthermore, [`RecordingStream::set_sink`] calls after this should not swap out to a new sink but re-use the existing one.
44/// Note that creating a new [`crate::sink::FileSink`] to the same file path (even temporarily) can cause
45/// a race between file creation (and thus clearing) and pending file writes.
46pub fn forced_sink_path() -> Option<String> {
47    std::env::var(ENV_FORCE_SAVE).ok()
48}
49
50/// Errors that can occur when creating/manipulating a [`RecordingStream`].
51#[derive(thiserror::Error, Debug)]
52pub enum RecordingStreamError {
53    /// Error within the underlying file sink.
54    #[error("Failed to create the underlying file sink: {0}")]
55    FileSink(#[from] re_log_encoding::FileSinkError),
56
57    /// Error within the underlying chunk batcher.
58    #[error("Failed to convert data to a valid chunk: {0}")]
59    Chunk(#[from] ChunkError),
60
61    /// Error within the underlying chunk batcher.
62    #[error("Failed to spawn the underlying batcher: {0}")]
63    ChunkBatcher(#[from] ChunkBatcherError),
64
65    /// Error within the underlying serializer.
66    #[error("Failed to serialize component data: {0}")]
67    Serialization(#[from] SerializationError),
68
69    /// Error spawning one of the background threads.
70    #[error("Failed to spawn background thread '{name}': {err}")]
71    SpawnThread {
72        /// Name of the thread
73        name: String,
74
75        /// Inner error explaining why the thread failed to spawn.
76        err: std::io::Error,
77    },
78
79    /// Error spawning a Rerun Viewer process.
80    #[error(transparent)] // makes bubbling all the way up to main look nice
81    SpawnViewer(#[from] crate::SpawnError),
82
83    /// Failure to host a web viewer and/or Rerun server.
84    #[cfg(feature = "web_viewer")]
85    #[error(transparent)]
86    WebSink(#[from] crate::web_viewer::WebViewerSinkError),
87
88    /// An error occurred while attempting to use a [`re_data_loader::DataLoader`].
89    #[cfg(feature = "data_loaders")]
90    #[error(transparent)]
91    DataLoaderError(#[from] re_data_loader::DataLoaderError),
92
93    /// Invalid gRPC server address.
94    #[error(transparent)]
95    UriError(#[from] re_uri::Error),
96
97    /// Invalid endpoint
98    #[error("not a `/proxy` endpoint")]
99    NotAProxyEndpoint,
100
101    /// Invalid bind IP.
102    #[error(transparent)]
103    InvalidAddress(#[from] std::net::AddrParseError),
104}
105
106/// Results that can occur when creating/manipulating a [`RecordingStream`].
107pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
108
109// ---
110
111/// Construct a [`RecordingStream`].
112///
113/// ``` no_run
114/// # use re_sdk::RecordingStreamBuilder;
115/// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
116/// # Ok::<(), Box<dyn std::error::Error>>(())
117/// ```
118///
119/// Automatically sends a [`Chunk`] with the default [`RecordingProperties`] to
120/// the sink, unless an explicit `recording_id` is set via [`RecordingStreamBuilder::recording_id`].
121#[derive(Debug)]
122pub struct RecordingStreamBuilder {
123    application_id: ApplicationId,
124    store_kind: StoreKind,
125    store_id: Option<StoreId>,
126    store_source: Option<StoreSource>,
127
128    default_enabled: bool,
129    enabled: Option<bool>,
130
131    batcher_config: Option<ChunkBatcherConfig>,
132
133    // Optional user-defined recording properties.
134    should_send_properties: bool,
135    properties: RecordingProperties,
136}
137
138impl RecordingStreamBuilder {
139    /// Create a new [`RecordingStreamBuilder`] with the given [`ApplicationId`].
140    ///
141    /// The [`ApplicationId`] is usually the name of your app.
142    ///
143    /// ```no_run
144    /// # use re_sdk::RecordingStreamBuilder;
145    /// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
146    /// # Ok::<(), Box<dyn std::error::Error>>(())
147    /// ```
148    //
149    // NOTE: track_caller so that we can see if we are being called from an official example.
150    #[track_caller]
151    pub fn new(application_id: impl Into<ApplicationId>) -> Self {
152        let application_id = application_id.into();
153
154        Self {
155            application_id,
156            store_kind: StoreKind::Recording,
157            store_id: None,
158            store_source: None,
159
160            default_enabled: true,
161            enabled: None,
162
163            batcher_config: None,
164
165            should_send_properties: true,
166            properties: RecordingProperties::new()
167                .with_start_time(re_types::components::Timestamp::now()),
168        }
169    }
170
171    /// Set whether or not Rerun is enabled by default.
172    ///
173    /// If the `RERUN` environment variable is set, it will override this.
174    ///
175    /// Set also: [`Self::enabled`].
176    #[inline]
177    pub fn default_enabled(mut self, default_enabled: bool) -> Self {
178        self.default_enabled = default_enabled;
179        self
180    }
181
182    /// Set whether or not Rerun is enabled.
183    ///
184    /// Setting this will ignore the `RERUN` environment variable.
185    ///
186    /// Set also: [`Self::default_enabled`].
187    #[inline]
188    pub fn enabled(mut self, enabled: bool) -> Self {
189        self.enabled = Some(enabled);
190        self
191    }
192
193    /// Set the `RecordingId` for this context.
194    ///
195    /// If you're logging from multiple processes and want all the messages to end up in the same
196    /// recording, you must make sure that they all set the same `RecordingId` using this function.
197    ///
198    /// Note that many stores can share the same [`ApplicationId`], but they all have
199    /// unique `RecordingId`s.
200    ///
201    /// The default is to use a random `RecordingId`.
202    ///
203    /// When explicitly setting a `RecordingId`, the initial chunk that contains the recording
204    /// properties will not be sent.
205    #[inline]
206    pub fn recording_id(mut self, recording_id: impl Into<String>) -> Self {
207        self.store_id = Some(StoreId::from_string(
208            StoreKind::Recording,
209            recording_id.into(),
210        ));
211        self.send_properties(false)
212    }
213
214    /// Sets an optional name for the recording.
215    #[inline]
216    pub fn recording_name(mut self, name: impl Into<String>) -> Self {
217        self.properties = self.properties.with_name(name.into());
218        self
219    }
220
221    /// Sets an optional name for the recording.
222    #[inline]
223    pub fn recording_started(mut self, started: impl Into<Timestamp>) -> Self {
224        self.properties = self.properties.with_start_time(started);
225        self
226    }
227
228    #[deprecated(since = "0.22.0", note = "use `send_properties` instead")]
229    /// Disables sending the [`RecordingProperties`] chunk.
230    #[inline]
231    pub fn disable_properties(mut self) -> Self {
232        self.should_send_properties = false;
233        self
234    }
235
236    /// Whether the [`RecordingProperties`] chunk should be sent.
237    #[inline]
238    pub fn send_properties(mut self, should_send: bool) -> Self {
239        self.should_send_properties = should_send;
240        self
241    }
242
243    /// Set the [`StoreId`] for this context.
244    ///
245    /// If you're logging from multiple processes and want all the messages to end up as the same
246    /// store, you must make sure they all set the same [`StoreId`] using this function.
247    ///
248    /// Note that many stores can share the same [`ApplicationId`], but they all have
249    /// unique [`StoreId`]s.
250    ///
251    /// The default is to use a random [`StoreId`].
252    #[inline]
253    pub fn store_id(mut self, store_id: StoreId) -> Self {
254        self.store_kind = store_id.kind;
255        self.store_id = Some(store_id);
256        self
257    }
258
259    /// Specifies the configuration of the internal data batching mechanism.
260    ///
261    /// See [`ChunkBatcher`] & [`ChunkBatcherConfig`] for more information.
262    #[inline]
263    pub fn batcher_config(mut self, config: ChunkBatcherConfig) -> Self {
264        self.batcher_config = Some(config);
265        self
266    }
267
268    #[doc(hidden)]
269    #[inline]
270    pub fn store_source(mut self, store_source: StoreSource) -> Self {
271        self.store_source = Some(store_source);
272        self
273    }
274
275    #[doc(hidden)]
276    #[inline]
277    pub fn blueprint(mut self) -> Self {
278        self.store_kind = StoreKind::Blueprint;
279        self
280    }
281
282    /// Creates a new [`RecordingStream`] that starts in a buffering state (RAM).
283    ///
284    /// ## Example
285    ///
286    /// ```
287    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").buffered()?;
288    /// # Ok::<(), Box<dyn std::error::Error>>(())
289    /// ```
290    pub fn buffered(self) -> RecordingStreamResult<RecordingStream> {
291        let (enabled, store_info, properties, batcher_config) = self.into_args();
292        if enabled {
293            RecordingStream::new(
294                store_info,
295                properties,
296                batcher_config,
297                Box::new(crate::log_sink::BufferedSink::new()),
298            )
299        } else {
300            re_log::debug!("Rerun disabled - call to buffered() ignored");
301            Ok(RecordingStream::disabled())
302        }
303    }
304
305    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
306    /// [`crate::log_sink::MemorySink`].
307    ///
308    /// ## Example
309    ///
310    /// ```
311    /// # fn log_data(_: &re_sdk::RecordingStream) { }
312    ///
313    /// let (rec, storage) = re_sdk::RecordingStreamBuilder::new("rerun_example_app").memory()?;
314    ///
315    /// log_data(&rec);
316    ///
317    /// let data = storage.take();
318    ///
319    /// # Ok::<(), Box<dyn std::error::Error>>(())
320    /// ```
321    pub fn memory(
322        self,
323    ) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> {
324        let (enabled, store_info, properties, batcher_config) = self.into_args();
325        let rec = if enabled {
326            RecordingStream::new(
327                store_info,
328                properties,
329                batcher_config,
330                Box::new(crate::log_sink::BufferedSink::new()),
331            )
332        } else {
333            re_log::debug!("Rerun disabled - call to memory() ignored");
334            Ok(RecordingStream::disabled())
335        }?;
336
337        let sink = crate::log_sink::MemorySink::new(rec.clone());
338        let storage = sink.buffer();
339        // Using set_sink here is necessary because the MemorySink needs to know
340        // it's own RecordingStream, which means we can't use `new` above.
341        // This has the downside of a bit of creation overhead and an extra StoreInfo
342        // message being sent to the sink.
343        // TODO(jleibs): Figure out a cleaner way to handle this.
344        rec.set_sink(Box::new(sink));
345        Ok((rec, storage))
346    }
347
348    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
349    /// remote Rerun instance.
350    ///
351    /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
352    ///
353    /// ## Example
354    ///
355    /// ```no_run
356    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").connect_grpc()?;
357    /// # Ok::<(), Box<dyn std::error::Error>>(())
358    /// ```
359    pub fn connect_grpc(self) -> RecordingStreamResult<RecordingStream> {
360        self.connect_grpc_opts(
361            format!(
362                "rerun+http://127.0.0.1:{}/proxy",
363                re_grpc_server::DEFAULT_SERVER_PORT
364            ),
365            crate::default_flush_timeout(),
366        )
367    }
368
369    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
370    /// remote Rerun instance.
371    ///
372    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
373    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
374    /// call to `flush` to block indefinitely if a connection cannot be established.
375    ///
376    /// ## Example
377    ///
378    /// ```no_run
379    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
380    ///     .connect_grpc_opts("rerun+http://127.0.0.1:9876/proxy", re_sdk::default_flush_timeout())?;
381    /// # Ok::<(), Box<dyn std::error::Error>>(())
382    /// ```
383    pub fn connect_grpc_opts(
384        self,
385        url: impl Into<String>,
386        flush_timeout: Option<Duration>,
387    ) -> RecordingStreamResult<RecordingStream> {
388        let (enabled, store_info, properties, batcher_config) = self.into_args();
389        if enabled {
390            let url: String = url.into();
391            let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
392                return Err(RecordingStreamError::NotAProxyEndpoint);
393            };
394
395            RecordingStream::new(
396                store_info,
397                properties,
398                batcher_config,
399                Box::new(crate::log_sink::GrpcSink::new(uri, flush_timeout)),
400            )
401        } else {
402            re_log::debug!("Rerun disabled - call to connect() ignored");
403            Ok(RecordingStream::disabled())
404        }
405    }
406
407    #[cfg(feature = "server")]
408    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
409    /// locally hosted gRPC server.
410    ///
411    /// The server is hosted on the default IP and port, and may be connected to by any SDK or Viewer
412    /// at `rerun+http://127.0.0.1:9876/proxy` or by just running `rerun --connect`.
413    ///
414    /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
415    ///
416    /// The gRPC server will buffer in memory so that late connecting viewers will still get all the data.
417    /// You can limit the amount of data buffered by the gRPC server using [`Self::serve_grpc_opts`],
418    /// with the `server_memory_limit` argument. Once the memory limit is reached, the earliest logged data
419    /// will be dropped. Static data is never dropped.
420    pub fn serve_grpc(self) -> RecordingStreamResult<RecordingStream> {
421        self.serve_grpc_opts(
422            "0.0.0.0",
423            crate::DEFAULT_SERVER_PORT,
424            re_memory::MemoryLimit::from_fraction_of_total(0.75),
425        )
426    }
427
428    #[cfg(feature = "server")]
429    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
430    /// locally hosted gRPC server.
431    ///
432    /// The server is hosted on the given `bind_ip` and `port`, may be connected to by any SDK or Viewer
433    /// at `rerun+http://{bind_ip}:{port}/proxy`.
434    ///
435    /// `0.0.0.0` is a good default for `bind_ip`.
436    ///
437    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
438    /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
439    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
440    pub fn serve_grpc_opts(
441        self,
442        bind_ip: impl AsRef<str>,
443        port: u16,
444        server_memory_limit: re_memory::MemoryLimit,
445    ) -> RecordingStreamResult<RecordingStream> {
446        let (enabled, store_info, properties, batcher_config) = self.into_args();
447        if enabled {
448            RecordingStream::new(
449                store_info,
450                properties,
451                batcher_config,
452                Box::new(crate::grpc_server::GrpcServerSink::new(
453                    bind_ip.as_ref(),
454                    port,
455                    server_memory_limit,
456                )?),
457            )
458        } else {
459            re_log::debug!("Rerun disabled - call to serve_grpc() ignored");
460            Ok(RecordingStream::disabled())
461        }
462    }
463
464    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an
465    /// RRD file on disk.
466    ///
467    /// The Rerun Viewer is able to read continuously from the resulting rrd file while it is being written.
468    /// However, depending on your OS and configuration, changes may not be immediately visible due to file caching.
469    /// This is a common issue on Windows and (to a lesser extent) on `MacOS`.
470    ///
471    /// ## Example
472    ///
473    /// ```no_run
474    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
475    /// # Ok::<(), Box<dyn std::error::Error>>(())
476    /// ```
477    #[cfg(not(target_arch = "wasm32"))]
478    pub fn save(
479        self,
480        path: impl Into<std::path::PathBuf>,
481    ) -> RecordingStreamResult<RecordingStream> {
482        let (enabled, store_info, properties, batcher_config) = self.into_args();
483
484        if enabled {
485            RecordingStream::new(
486                store_info,
487                properties,
488                batcher_config,
489                Box::new(crate::sink::FileSink::new(path)?),
490            )
491        } else {
492            re_log::debug!("Rerun disabled - call to save() ignored");
493            Ok(RecordingStream::disabled())
494        }
495    }
496
497    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to stdout.
498    ///
499    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
500    /// default back to `buffered` mode, in order not to break the user's terminal.
501    ///
502    /// ## Example
503    ///
504    /// ```no_run
505    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").stdout()?;
506    /// # Ok::<(), Box<dyn std::error::Error>>(())
507    /// ```
508    #[cfg(not(target_arch = "wasm32"))]
509    pub fn stdout(self) -> RecordingStreamResult<RecordingStream> {
510        if std::io::stdout().is_terminal() {
511            re_log::debug!("Ignored call to stdout() because stdout is a terminal");
512            return self.buffered();
513        }
514
515        let (enabled, store_info, properties, batcher_config) = self.into_args();
516
517        if enabled {
518            RecordingStream::new(
519                store_info,
520                properties,
521                batcher_config,
522                Box::new(crate::sink::FileSink::stdout()?),
523            )
524        } else {
525            re_log::debug!("Rerun disabled - call to stdout() ignored");
526            Ok(RecordingStream::disabled())
527        }
528    }
529
530    /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
531    /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
532    ///
533    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
534    /// that viewer instead of starting a new one.
535    ///
536    /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
537    /// as well as the underlying connection.
538    ///
539    /// ## Example
540    ///
541    /// ```no_run
542    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").spawn()?;
543    /// # Ok::<(), Box<dyn std::error::Error>>(())
544    /// ```
545    pub fn spawn(self) -> RecordingStreamResult<RecordingStream> {
546        self.spawn_opts(&Default::default(), crate::default_flush_timeout())
547    }
548
549    /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
550    /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
551    ///
552    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
553    /// that viewer instead of starting a new one.
554    ///
555    /// The behavior of the spawned Viewer can be configured via `opts`.
556    /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
557    ///
558    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
559    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
560    /// call to `flush` to block indefinitely if a connection cannot be established.
561    ///
562    /// ## Example
563    ///
564    /// ```no_run
565    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
566    ///     .spawn_opts(&re_sdk::SpawnOptions::default(), re_sdk::default_flush_timeout())?;
567    /// # Ok::<(), Box<dyn std::error::Error>>(())
568    /// ```
569    pub fn spawn_opts(
570        self,
571        opts: &crate::SpawnOptions,
572        flush_timeout: Option<Duration>,
573    ) -> RecordingStreamResult<RecordingStream> {
574        if !self.is_enabled() {
575            re_log::debug!("Rerun disabled - call to spawn() ignored");
576            return Ok(RecordingStream::disabled());
577        }
578
579        let url = format!("rerun+http://{}/proxy", opts.connect_addr());
580
581        // NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
582        // what, thus spawning a viewer is pointless (and probably not intended).
583        if forced_sink_path().is_some() {
584            return self.connect_grpc_opts(url, flush_timeout);
585        }
586
587        crate::spawn(opts)?;
588
589        self.connect_grpc_opts(url, flush_timeout)
590    }
591
592    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
593    /// web-based Rerun viewer via gRPC.
594    ///
595    /// If the `open_browser` argument is `true`, your default browser will be opened with a
596    /// connected web-viewer.
597    ///
598    /// If not, you can connect to this server using the `rerun` binary (`cargo install rerun-cli --locked`).
599    ///
600    /// ## Details
601    /// This method will spawn two servers: one HTTPS server serving the Rerun Web Viewer `.html` and `.wasm` files,
602    /// and then one gRPC server that streams the log data to the web viewer (or to a native viewer, or to multiple viewers).
603    ///
604    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
605    /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
606    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
607    ///
608    /// ## Example
609    ///
610    /// ```ignore
611    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
612    ///     .serve("0.0.0.0",
613    ///            Default::default(),
614    ///            Default::default(),
615    ///            re_sdk::MemoryLimit::from_fraction_of_total(0.25),
616    ///            true)?;
617    /// # Ok::<(), Box<dyn std::error::Error>>(())
618    /// ```
619    //
620    // # TODO(#5531): keep static data around.
621    #[cfg(feature = "web_viewer")]
622    #[deprecated(
623        since = "0.20.0",
624        note = "use rec.serve_grpc() with rerun::serve_web_viewer() instead"
625    )]
626    pub fn serve(
627        self,
628        bind_ip: &str,
629        web_port: WebViewerServerPort,
630        grpc_port: u16,
631        server_memory_limit: re_memory::MemoryLimit,
632        open_browser: bool,
633    ) -> RecordingStreamResult<RecordingStream> {
634        self.serve_web(
635            bind_ip,
636            web_port,
637            grpc_port,
638            server_memory_limit,
639            open_browser,
640        )
641    }
642
643    /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
644    /// web-based Rerun viewer via gRPC.
645    ///
646    /// If the `open_browser` argument is `true`, your default browser will be opened with a
647    /// connected web-viewer.
648    ///
649    /// If not, you can connect to this server using the `rerun` binary (`cargo install rerun-cli --locked`).
650    ///
651    /// ## Details
652    /// This method will spawn two servers: one HTTPS server serving the Rerun Web Viewer `.html` and `.wasm` files,
653    /// and then one gRPC server that streams the log data to the web viewer (or to a native viewer, or to multiple viewers).
654    ///
655    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
656    /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
657    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
658    ///
659    /// Calling `serve_web` is equivalent to calling [`Self::serve_grpc`] followed by [`crate::serve_web_viewer`].
660    ///
661    /// ## Example
662    ///
663    /// ```ignore
664    /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
665    ///     .serve_web("0.0.0.0",
666    ///                Default::default(),
667    ///                Default::default(),
668    ///                re_sdk::MemoryLimit::from_fraction_of_total(0.25),
669    ///                true)?;
670    /// # Ok::<(), Box<dyn std::error::Error>>(())
671    /// ```
672    //
673    // # TODO(#5531): keep static data around.
674    #[cfg(feature = "web_viewer")]
675    pub fn serve_web(
676        self,
677        bind_ip: &str,
678        web_port: WebViewerServerPort,
679        grpc_port: u16,
680        server_memory_limit: re_memory::MemoryLimit,
681        open_browser: bool,
682    ) -> RecordingStreamResult<RecordingStream> {
683        let (enabled, store_info, properties, batcher_config) = self.into_args();
684        if enabled {
685            let sink = crate::web_viewer::new_sink(
686                open_browser,
687                bind_ip,
688                web_port,
689                grpc_port,
690                server_memory_limit,
691            )?;
692            RecordingStream::new(store_info, properties, batcher_config, sink)
693        } else {
694            re_log::debug!("Rerun disabled - call to serve() ignored");
695            Ok(RecordingStream::disabled())
696        }
697    }
698
699    /// Returns whether or not logging is enabled, a [`StoreInfo`] and the associated batcher
700    /// configuration.
701    ///
702    /// This can be used to then construct a [`RecordingStream`] manually using
703    /// [`RecordingStream::new`].
704    pub fn into_args(
705        self,
706    ) -> (
707        bool,
708        StoreInfo,
709        Option<RecordingProperties>,
710        ChunkBatcherConfig,
711    ) {
712        let enabled = self.is_enabled();
713
714        let Self {
715            application_id,
716            store_kind,
717            store_id,
718            store_source,
719            default_enabled: _,
720            enabled: _,
721            batcher_config,
722            should_send_properties,
723            properties,
724        } = self;
725
726        let store_id = store_id.unwrap_or(StoreId::random(store_kind));
727        let store_source = store_source.unwrap_or_else(|| StoreSource::RustSdk {
728            rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
729            llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
730        });
731
732        let store_info = StoreInfo {
733            application_id,
734            store_id,
735            cloned_from: None,
736            store_source,
737            started: None,
738            store_version: Some(re_build_info::CrateVersion::LOCAL),
739        };
740
741        let batcher_config =
742            batcher_config.unwrap_or_else(|| match ChunkBatcherConfig::from_env() {
743                Ok(config) => config,
744                Err(err) => {
745                    re_log::error!("Failed to parse ChunkBatcherConfig from env: {}", err);
746                    ChunkBatcherConfig::default()
747                }
748            });
749
750        (
751            enabled,
752            store_info,
753            should_send_properties.then_some(properties),
754            batcher_config,
755        )
756    }
757
758    /// Internal check for whether or not logging is enabled using explicit/default settings & env var.
759    fn is_enabled(&self) -> bool {
760        self.enabled
761            .unwrap_or_else(|| crate::decide_logging_enabled(self.default_enabled))
762    }
763}
764
765// ----------------------------------------------------------------------------
766
767/// A [`RecordingStream`] handles everything related to logging data into Rerun.
768///
769/// You can construct a new [`RecordingStream`] using [`RecordingStreamBuilder`] or
770/// [`RecordingStream::new`].
771///
772/// ## Sinks
773///
774/// Data is logged into Rerun via [`LogSink`]s.
775///
776/// The underlying [`LogSink`] of a [`RecordingStream`] can be changed at any point during its
777/// lifetime by calling [`RecordingStream::set_sink`] or one of the higher level helpers
778/// ([`RecordingStream::connect_grpc`], [`RecordingStream::memory`],
779/// [`RecordingStream::save`], [`RecordingStream::disconnect`]).
780///
781/// See [`RecordingStream::set_sink`] for more information.
782///
783/// ## Multithreading and ordering
784///
785/// [`RecordingStream`] can be cheaply cloned and used freely across any number of threads.
786///
787/// Internally, all operations are linearized into a pipeline:
788/// - All operations sent by a given thread will take effect in the same exact order as that
789///   thread originally sent them in, from its point of view.
790/// - There isn't any well defined global order across multiple threads.
791///
792/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all previous data sent by the calling thread
793/// has been recorded and (if applicable) flushed to the underlying OS-managed file descriptor,
794/// but other threads may still have data in flight.
795///
796/// ## Shutdown
797///
798/// The [`RecordingStream`] can only be shutdown by dropping all instances of it, at which point
799/// it will automatically take care of flushing any pending data that might remain in the pipeline.
800///
801/// Shutting down cannot ever block.
802#[derive(Clone)]
803pub struct RecordingStream {
804    inner: Either<Arc<Option<RecordingStreamInner>>, Weak<Option<RecordingStreamInner>>>,
805}
806
807impl RecordingStream {
808    /// Passes a reference to the [`RecordingStreamInner`], if it exists.
809    ///
810    /// This works whether the underlying stream is strong or weak.
811    #[inline]
812    fn with<F: FnOnce(&RecordingStreamInner) -> R, R>(&self, f: F) -> Option<R> {
813        use std::ops::Deref as _;
814        match &self.inner {
815            Either::Left(strong) => strong.deref().as_ref().map(f),
816            Either::Right(weak) => weak
817                .upgrade()
818                .and_then(|strong| strong.deref().as_ref().map(f)),
819        }
820    }
821
822    /// Clones the [`RecordingStream`] without incrementing the refcount.
823    ///
824    /// Useful e.g. if you want to make sure that a detached thread won't prevent the [`RecordingStream`]
825    /// from flushing during shutdown.
826    //
827    // TODO(#5335): shutdown flushing behavior is too brittle.
828    #[inline]
829    pub fn clone_weak(&self) -> Self {
830        Self {
831            inner: match &self.inner {
832                Either::Left(strong) => Either::Right(Arc::downgrade(strong)),
833                Either::Right(weak) => Either::Right(Weak::clone(weak)),
834            },
835        }
836    }
837}
838
839// TODO(#5335): shutdown flushing behavior is too brittle.
840impl Drop for RecordingStream {
841    #[inline]
842    fn drop(&mut self) {
843        // If this holds the last strong handle to the recording, make sure that all pending
844        // `DataLoader` threads that were started from the SDK actually run to completion (they
845        // all hold a weak handle to this very recording!).
846        //
847        // NOTE: It's very important to do so from the `Drop` implementation of `RecordingStream`
848        // itself, because the dataloader threads -- by definition -- will have to send data into
849        // this very recording, therefore we must make sure that at least one strong handle still lives
850        // on until they are all finished.
851        if let Either::Left(strong) = &mut self.inner {
852            if Arc::strong_count(strong) == 1 {
853                // Keep the recording alive until all dataloaders are finished.
854                self.with(|inner| inner.wait_for_dataloaders());
855            }
856        }
857    }
858}
859
860struct RecordingStreamInner {
861    info: StoreInfo,
862    properties: Option<RecordingProperties>,
863    tick: AtomicI64,
864
865    /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
866    /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
867    /// running.
868    cmds_tx: Sender<Command>,
869
870    batcher: ChunkBatcher,
871    batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
872
873    /// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader`
874    /// machinery in the context of this `RecordingStream`.
875    ///
876    /// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
877    dataloader_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,
878
879    pid_at_creation: u32,
880}
881
882impl fmt::Debug for RecordingStreamInner {
883    #[inline]
884    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
885        f.debug_struct("RecordingStreamInner")
886            .field("info", &self.info.store_id)
887            .finish()
888    }
889}
890
891impl Drop for RecordingStreamInner {
892    fn drop(&mut self) {
893        if self.is_forked_child() {
894            re_log::error_once!(
895                "Fork detected while dropping RecordingStreamInner. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
896            );
897            return;
898        }
899
900        self.wait_for_dataloaders();
901
902        // NOTE: The command channel is private, if we're here, nothing is currently capable of
903        // sending data down the pipeline.
904        self.batcher.flush_blocking();
905        self.cmds_tx.send(Command::PopPendingChunks).ok();
906        self.cmds_tx.send(Command::Shutdown).ok();
907        if let Some(handle) = self.batcher_to_sink_handle.take() {
908            handle.join().ok();
909        }
910    }
911}
912
913impl RecordingStreamInner {
914    fn new(
915        info: StoreInfo,
916        properties: Option<RecordingProperties>,
917        batcher_config: ChunkBatcherConfig,
918        sink: Box<dyn LogSink>,
919    ) -> RecordingStreamResult<Self> {
920        let on_release = batcher_config.hooks.on_release.clone();
921        let batcher = ChunkBatcher::new(batcher_config)?;
922
923        {
924            re_log::debug!(
925                app_id = %info.application_id,
926                rec_id = %info.store_id,
927                "setting recording info",
928            );
929            sink.send(
930                re_log_types::SetStoreInfo {
931                    row_id: *RowId::new(),
932                    info: info.clone(),
933                }
934                .into(),
935            );
936        }
937
938        let (cmds_tx, cmds_rx) = crossbeam::channel::unbounded();
939
940        let batcher_to_sink_handle = {
941            const NAME: &str = "RecordingStream::batcher_to_sink";
942            std::thread::Builder::new()
943                .name(NAME.into())
944                .spawn({
945                    let info = info.clone();
946                    let batcher = batcher.clone();
947                    move || forwarding_thread(info, sink, cmds_rx, batcher.chunks(), on_release)
948                })
949                .map_err(|err| RecordingStreamError::SpawnThread {
950                    name: NAME.into(),
951                    err,
952                })?
953        };
954
955        if let Some(properties) = properties.as_ref() {
956            // We pre-populate the batcher with a chunk the contains the recording
957            // properties, so that these get automatically sent to the sink.
958
959            re_log::debug!(properties = ?properties, "adding recording properties to batcher");
960
961            let properties_chunk = Chunk::builder(EntityPath::recording_properties())
962                .with_archetype(RowId::new(), TimePoint::default(), properties)
963                .build()?;
964
965            batcher.push_chunk(properties_chunk);
966        }
967
968        Ok(Self {
969            info,
970            properties,
971            tick: AtomicI64::new(0),
972            cmds_tx,
973            batcher,
974            batcher_to_sink_handle: Some(batcher_to_sink_handle),
975            dataloader_handles: Mutex::new(Vec::new()),
976            pid_at_creation: std::process::id(),
977        })
978    }
979
980    #[inline]
981    pub fn is_forked_child(&self) -> bool {
982        self.pid_at_creation != std::process::id()
983    }
984
985    /// Make sure all pending top-level `DataLoader` threads that were started from the SDK run to completion.
986    //
987    // TODO(cmc): At some point we might want to make it configurable, though I cannot really
988    // think of a use case where you'd want to drop those threads immediately upon
989    // disconnection.
990    fn wait_for_dataloaders(&self) {
991        let dataloader_handles = std::mem::take(&mut *self.dataloader_handles.lock());
992        for handle in dataloader_handles {
993            handle.join().ok();
994        }
995    }
996}
997
998enum Command {
999    RecordMsg(LogMsg),
1000    SwapSink(Box<dyn LogSink>),
1001    Flush(Sender<()>),
1002    PopPendingChunks,
1003    Shutdown,
1004}
1005
1006impl Command {
1007    fn flush() -> (Self, Receiver<()>) {
1008        let (tx, rx) = crossbeam::channel::bounded(0); // oneshot
1009        (Self::Flush(tx), rx)
1010    }
1011}
1012
1013impl RecordingStream {
1014    /// Creates a new [`RecordingStream`] with a given [`StoreInfo`] and [`LogSink`].
1015    ///
1016    /// You can create a [`StoreInfo`] with [`crate::new_store_info`];
1017    ///
1018    /// The [`StoreInfo`] is immediately sent to the sink in the form of a
1019    /// [`re_log_types::SetStoreInfo`].
1020    ///
1021    /// You can find sinks in [`crate::sink`].
1022    ///
1023    /// See also: [`RecordingStreamBuilder`].
1024    #[must_use = "Recording will get closed automatically once all instances of this object have been dropped"]
1025    pub fn new(
1026        info: StoreInfo,
1027        properties: Option<RecordingProperties>,
1028        batcher_config: ChunkBatcherConfig,
1029        sink: Box<dyn LogSink>,
1030    ) -> RecordingStreamResult<Self> {
1031        let sink = (info.store_id.kind == StoreKind::Recording)
1032            .then(forced_sink_path)
1033            .flatten()
1034            .map_or(sink, |path| {
1035                re_log::info!("Forcing FileSink because of env-var {ENV_FORCE_SAVE}={path:?}");
1036                // `unwrap` is ok since this force sinks are only used in tests.
1037                Box::new(crate::sink::FileSink::new(path).unwrap()) as Box<dyn LogSink>
1038            });
1039
1040        let stream =
1041            RecordingStreamInner::new(info, properties, batcher_config, sink).map(|inner| {
1042                Self {
1043                    inner: Either::Left(Arc::new(Some(inner))),
1044                }
1045            })?;
1046
1047        Ok(stream)
1048    }
1049
1050    /// Creates a new no-op [`RecordingStream`] that drops all logging messages, doesn't allocate
1051    /// any memory and doesn't spawn any threads.
1052    ///
1053    /// [`Self::is_enabled`] will return `false`.
1054    pub fn disabled() -> Self {
1055        Self {
1056            inner: Either::Left(Arc::new(None)),
1057        }
1058    }
1059}
1060
1061impl RecordingStream {
1062    /// Log data to Rerun.
1063    ///
1064    /// This is the main entry point for logging data to rerun. It can be used to log anything
1065    /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1066    /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1067    ///
1068    /// The data will be timestamped automatically based on the [`RecordingStream`]'s internal clock.
1069    /// See [`RecordingStream::set_time_sequence`] etc for more information.
1070    ///
1071    /// The entity path can either be a string
1072    /// (with special characters escaped, split on unescaped slashes)
1073    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1074    /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1075    ///
1076    /// See also: [`Self::log_static`] for logging static data.
1077    ///
1078    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1079    /// transport.
1080    /// See [SDK Micro Batching] for more information.
1081    ///
1082    /// # Example:
1083    /// ```ignore
1084    /// # use rerun;
1085    /// # let (rec, storage) = rerun::RecordingStreamBuilder::new("rerun_example_points3d_simple").memory()?;
1086    /// rec.log(
1087    ///     "my/points",
1088    ///     &rerun::Points3D::new([(0.0, 0.0, 0.0), (1.0, 1.0, 1.0)]),
1089    /// )?;
1090    /// # Ok::<(), Box<dyn std::error::Error>>(())
1091    /// ```
1092    ///
1093    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1094    /// [component bundle]: [`AsComponents`]
1095    #[inline]
1096    pub fn log<AS: ?Sized + AsComponents>(
1097        &self,
1098        ent_path: impl Into<EntityPath>,
1099        as_components: &AS,
1100    ) -> RecordingStreamResult<()> {
1101        self.log_with_static(ent_path, false, as_components)
1102    }
1103
1104    /// Lower-level logging API to provide data spanning multiple timepoints.
1105    ///
1106    /// Unlike the regular `log` API, which is row-oriented, this API lets you submit the data
1107    /// in a columnar form. The lengths of all of the [`TimeColumn`] and the component columns
1108    /// must match. All data that occurs at the same index across the different index/time and components
1109    /// arrays will act as a single logical row.
1110    ///
1111    /// Note that this API ignores any stateful index/time set on the log stream via the
1112    /// [`Self::set_time`]/[`Self::set_timepoint`]/[`Self::set_time_nanos`]/etc. APIs.
1113    /// Furthermore, this will _not_ inject the default timelines `log_tick` and `log_time` timeline columns.
1114    pub fn send_columns(
1115        &self,
1116        ent_path: impl Into<EntityPath>,
1117        indexes: impl IntoIterator<Item = TimeColumn>,
1118        columns: impl IntoIterator<Item = SerializedComponentColumn>,
1119    ) -> RecordingStreamResult<()> {
1120        let id = ChunkId::new();
1121
1122        let indexes = indexes
1123            .into_iter()
1124            .map(|col| (*col.timeline().name(), col))
1125            .collect();
1126
1127        let components: ChunkComponents = columns
1128            .into_iter()
1129            .map(|column| (column.descriptor, column.list_array))
1130            .collect();
1131
1132        let chunk = Chunk::from_auto_row_ids(id, ent_path.into(), indexes, components)?;
1133
1134        self.send_chunk(chunk);
1135
1136        Ok(())
1137    }
1138
1139    /// Log data to Rerun.
1140    ///
1141    /// It can be used to log anything
1142    /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1143    /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1144    ///
1145    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1146    /// any temporal data of the same type.
1147    /// All timestamp data associated with this message will be dropped right before sending it to Rerun.
1148    ///
1149    /// This is most often used for [`rerun::ViewCoordinates`](https://docs.rs/rerun/latest/rerun/archetypes/struct.ViewCoordinates.html) and
1150    /// [`rerun::AnnotationContext`](https://docs.rs/rerun/latest/rerun/archetypes/struct.AnnotationContext.html).
1151    ///
1152    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1153    /// transport.
1154    /// See [SDK Micro Batching] for more information.
1155    ///
1156    /// See also [`Self::log`].
1157    ///
1158    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1159    /// [component bundle]: [`AsComponents`]
1160    #[inline]
1161    pub fn log_static<AS: ?Sized + AsComponents>(
1162        &self,
1163        ent_path: impl Into<EntityPath>,
1164        as_components: &AS,
1165    ) -> RecordingStreamResult<()> {
1166        self.log_with_static(ent_path, true, as_components)
1167    }
1168
1169    /// Logs the contents of a [component bundle] into Rerun.
1170    ///
1171    /// If `static_` is set to `true`, all timestamp data associated with this message will be
1172    /// dropped right before sending it to Rerun.
1173    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1174    /// any temporal data of the same type.
1175    ///
1176    /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1177    /// internal clock.
1178    /// See `RecordingStream::set_time_*` family of methods for more information.
1179    ///
1180    /// The entity path can either be a string
1181    /// (with special characters escaped, split on unescaped slashes)
1182    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1183    /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1184    ///
1185    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1186    /// transport.
1187    /// See [SDK Micro Batching] for more information.
1188    ///
1189    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1190    /// [component bundle]: [`AsComponents`]
1191    #[inline]
1192    pub fn log_with_static<AS: ?Sized + AsComponents>(
1193        &self,
1194        ent_path: impl Into<EntityPath>,
1195        static_: bool,
1196        as_components: &AS,
1197    ) -> RecordingStreamResult<()> {
1198        let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1199        self.log_serialized_batches_impl(
1200            row_id,
1201            ent_path,
1202            static_,
1203            as_components.as_serialized_batches(),
1204        )
1205    }
1206
1207    /// Logs a set of [`SerializedComponentBatch`]es into Rerun.
1208    ///
1209    /// If `static_` is set to `true`, all timestamp data associated with this message will be
1210    /// dropped right before sending it to Rerun.
1211    /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1212    /// any temporal data of the same type.
1213    ///
1214    /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1215    /// internal clock.
1216    /// See `RecordingStream::set_time_*` family of methods for more information.
1217    ///
1218    /// The number of instances will be determined by the longest batch in the bundle.
1219    ///
1220    /// The entity path can either be a string
1221    /// (with special characters escaped, split on unescaped slashes)
1222    /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1223    /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1224    ///
1225    /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1226    /// transport.
1227    /// See [SDK Micro Batching] for more information.
1228    ///
1229    /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1230    ///
1231    /// [`SerializedComponentBatch`]: [re_types_core::SerializedComponentBatch]
1232    pub fn log_serialized_batches(
1233        &self,
1234        ent_path: impl Into<EntityPath>,
1235        static_: bool,
1236        comp_batches: impl IntoIterator<Item = re_types::SerializedComponentBatch>,
1237    ) -> RecordingStreamResult<()> {
1238        let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1239        self.log_serialized_batches_impl(row_id, ent_path, static_, comp_batches)
1240    }
1241
1242    /// Sends a property to the recording.
1243    #[inline]
1244    pub fn send_property<AS: ?Sized + AsComponents>(
1245        &self,
1246        name: impl Into<String>,
1247        values: &AS,
1248    ) -> RecordingStreamResult<()> {
1249        let sub_path = EntityPath::from(name.into());
1250        self.log_static(EntityPath::properties().join(&sub_path), values)
1251    }
1252
1253    /// Sends the name of the recording.
1254    #[inline]
1255    pub fn send_recording_name(&self, name: impl Into<String>) -> RecordingStreamResult<()> {
1256        let update = RecordingProperties::update_fields().with_name(name.into());
1257        self.log_static(EntityPath::recording_properties(), &update)
1258    }
1259
1260    /// Sends the start time of the recording.
1261    #[inline]
1262    pub fn send_recording_start_time(
1263        &self,
1264        timestamp: impl Into<Timestamp>,
1265    ) -> RecordingStreamResult<()> {
1266        let update = RecordingProperties::update_fields().with_start_time(timestamp.into());
1267        self.log_static(EntityPath::recording_properties(), &update)
1268    }
1269
1270    // NOTE: For bw and fw compatibility reasons, we need our logging APIs to be fallible, even
1271    // though they really aren't at the moment.
1272    #[allow(clippy::unnecessary_wraps)]
1273    fn log_serialized_batches_impl(
1274        &self,
1275        row_id: RowId,
1276        entity_path: impl Into<EntityPath>,
1277        static_: bool,
1278        comp_batches: impl IntoIterator<Item = re_types::SerializedComponentBatch>,
1279    ) -> RecordingStreamResult<()> {
1280        if !self.is_enabled() {
1281            return Ok(()); // silently drop the message
1282        }
1283
1284        let entity_path = entity_path.into();
1285
1286        let comp_batches: Vec<_> = comp_batches
1287            .into_iter()
1288            .map(|comp_batch| (comp_batch.descriptor, comp_batch.array))
1289            .collect();
1290        let components: IntMap<_, _> = comp_batches.into_iter().collect();
1291
1292        // NOTE: The timepoint is irrelevant, the `RecordingStream` will overwrite it using its
1293        // internal clock.
1294        let timepoint = TimePoint::default();
1295
1296        if !components.is_empty() {
1297            let row = PendingRow {
1298                row_id,
1299                timepoint,
1300                components,
1301            };
1302            self.record_row(entity_path, row, !static_);
1303        }
1304
1305        Ok(())
1306    }
1307
1308    /// Logs the file at the given `path` using all [`re_data_loader::DataLoader`]s available.
1309    ///
1310    /// A single `path` might be handled by more than one loader.
1311    ///
1312    /// This method blocks until either at least one [`re_data_loader::DataLoader`] starts
1313    /// streaming data in or all of them fail.
1314    ///
1315    /// See <https://www.rerun.io/docs/reference/data-loaders/overview> for more information.
1316    #[cfg(feature = "data_loaders")]
1317    pub fn log_file_from_path(
1318        &self,
1319        filepath: impl AsRef<std::path::Path>,
1320        entity_path_prefix: Option<EntityPath>,
1321        static_: bool,
1322    ) -> RecordingStreamResult<()> {
1323        self.log_file(filepath, None, entity_path_prefix, static_, true)
1324    }
1325
1326    /// Logs the given `contents` using all [`re_data_loader::DataLoader`]s available.
1327    ///
1328    /// A single `path` might be handled by more than one loader.
1329    ///
1330    /// This method blocks until either at least one [`re_data_loader::DataLoader`] starts
1331    /// streaming data in or all of them fail.
1332    ///
1333    /// See <https://www.rerun.io/docs/reference/data-loaders/overview> for more information.
1334    #[cfg(feature = "data_loaders")]
1335    pub fn log_file_from_contents(
1336        &self,
1337        filepath: impl AsRef<std::path::Path>,
1338        contents: std::borrow::Cow<'_, [u8]>,
1339        entity_path_prefix: Option<EntityPath>,
1340        static_: bool,
1341    ) -> RecordingStreamResult<()> {
1342        self.log_file(filepath, Some(contents), entity_path_prefix, static_, true)
1343    }
1344
1345    /// If `prefer_current_recording` is set (which is always the case for now), the dataloader settings
1346    /// will be configured as if the current SDK recording is the currently opened recording.
1347    /// Most dataloaders prefer logging to the currently opened recording if one is set.
1348    #[cfg(feature = "data_loaders")]
1349    fn log_file(
1350        &self,
1351        filepath: impl AsRef<std::path::Path>,
1352        contents: Option<std::borrow::Cow<'_, [u8]>>,
1353        entity_path_prefix: Option<EntityPath>,
1354        static_: bool,
1355        prefer_current_recording: bool,
1356    ) -> RecordingStreamResult<()> {
1357        let Some(store_info) = self.store_info().clone() else {
1358            re_log::warn!(
1359                "Ignored call to log_file() because RecordingStream has not been properly initialized"
1360            );
1361            return Ok(());
1362        };
1363
1364        let filepath = filepath.as_ref();
1365        let has_contents = contents.is_some();
1366
1367        let (tx, rx) = re_smart_channel::smart_channel(
1368            re_smart_channel::SmartMessageSource::Sdk,
1369            re_smart_channel::SmartChannelSource::File(filepath.into()),
1370        );
1371
1372        let mut settings = crate::DataLoaderSettings {
1373            application_id: Some(store_info.application_id.clone()),
1374            opened_application_id: None,
1375            store_id: store_info.store_id.clone(),
1376            opened_store_id: None,
1377            force_store_info: false,
1378            entity_path_prefix,
1379            timepoint: (!static_).then(|| {
1380                self.with(|inner| {
1381                    // Get the current time on all timelines, for the current recording, on the current
1382                    // thread…
1383                    let mut now = self.now();
1384
1385                    // …and then also inject the current recording tick into it.
1386                    let tick = inner
1387                        .tick
1388                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1389                    now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1390
1391                    now
1392                })
1393                .unwrap_or_default()
1394            }),
1395        };
1396
1397        if prefer_current_recording {
1398            settings.opened_application_id = Some(store_info.application_id.clone());
1399            settings.opened_store_id = Some(store_info.store_id);
1400        }
1401
1402        if let Some(contents) = contents {
1403            re_data_loader::load_from_file_contents(
1404                &settings,
1405                re_log_types::FileSource::Sdk,
1406                filepath,
1407                contents,
1408                &tx,
1409            )?;
1410        } else {
1411            re_data_loader::load_from_path(
1412                &settings,
1413                re_log_types::FileSource::Sdk,
1414                filepath,
1415                &tx,
1416            )?;
1417        }
1418        drop(tx);
1419
1420        // We can safely ignore the error on `recv()` as we're in complete control of both ends of
1421        // the channel.
1422        let thread_name = if has_contents {
1423            format!("log_file_from_contents({filepath:?})")
1424        } else {
1425            format!("log_file_from_path({filepath:?})")
1426        };
1427        let handle = std::thread::Builder::new()
1428            .name(thread_name.clone())
1429            .spawn({
1430                let this = self.clone_weak();
1431                move || {
1432                    while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) {
1433                        this.record_msg(msg);
1434                    }
1435                }
1436            })
1437            .map_err(|err| RecordingStreamError::SpawnThread {
1438                name: thread_name,
1439                err,
1440            })?;
1441
1442        self.with(|inner| inner.dataloader_handles.lock().push(handle));
1443
1444        Ok(())
1445    }
1446}
1447
1448#[allow(clippy::needless_pass_by_value)]
1449fn forwarding_thread(
1450    info: StoreInfo,
1451    mut sink: Box<dyn LogSink>,
1452    cmds_rx: Receiver<Command>,
1453    chunks: Receiver<Chunk>,
1454    on_release: Option<ArrowRecordBatchReleaseCallback>,
1455) {
1456    /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate
1457    /// shutdown.
1458    fn handle_cmd(info: &StoreInfo, cmd: Command, sink: &mut Box<dyn LogSink>) -> bool {
1459        match cmd {
1460            Command::RecordMsg(msg) => {
1461                sink.send(msg);
1462            }
1463            Command::SwapSink(new_sink) => {
1464                re_log::trace!("Swapping sink…");
1465                let backlog = {
1466                    // Capture the backlog if it exists.
1467                    let backlog = sink.drain_backlog();
1468
1469                    // Flush the underlying sink if possible.
1470                    sink.drop_if_disconnected();
1471                    sink.flush_blocking();
1472
1473                    backlog
1474                };
1475
1476                // Send the recording info to the new sink. This is idempotent.
1477                {
1478                    re_log::debug!(
1479                        app_id = %info.application_id,
1480                        rec_id = %info.store_id,
1481                        "setting recording info",
1482                    );
1483                    new_sink.send(
1484                        re_log_types::SetStoreInfo {
1485                            row_id: *RowId::new(),
1486                            info: info.clone(),
1487                        }
1488                        .into(),
1489                    );
1490                    new_sink.send_all(backlog);
1491                }
1492
1493                *sink = new_sink;
1494            }
1495            Command::Flush(oneshot) => {
1496                re_log::trace!("Flushing…");
1497                // Flush the underlying sink if possible.
1498                sink.drop_if_disconnected();
1499                sink.flush_blocking();
1500                drop(oneshot); // signals the oneshot
1501            }
1502            Command::PopPendingChunks => {
1503                // Wake up and skip the current iteration so that we can drain all pending chunks
1504                // before handling the next command.
1505            }
1506            Command::Shutdown => return false,
1507        }
1508
1509        true
1510    }
1511
1512    use crossbeam::select;
1513    loop {
1514        // NOTE: Always pop chunks first, this is what makes `Command::PopPendingChunks` possible,
1515        // which in turns makes `RecordingStream::flush_blocking` well defined.
1516        while let Ok(chunk) = chunks.try_recv() {
1517            let mut msg = match chunk.to_arrow_msg() {
1518                Ok(chunk) => chunk,
1519                Err(err) => {
1520                    re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1521                    continue;
1522                }
1523            };
1524            msg.on_release = on_release.clone();
1525            sink.send(LogMsg::ArrowMsg(info.store_id.clone(), msg));
1526        }
1527
1528        select! {
1529            recv(chunks) -> res => {
1530                let Ok(chunk) = res else {
1531                    // The batcher is gone, which can only happen if the `RecordingStream` itself
1532                    // has been dropped.
1533                    re_log::trace!("Shutting down forwarding_thread: batcher is gone");
1534                    break;
1535                };
1536
1537                let msg = match chunk.to_arrow_msg() {
1538                    Ok(chunk) => chunk,
1539                    Err(err) => {
1540                        re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1541                        continue;
1542                    }
1543                };
1544
1545                sink.send(LogMsg::ArrowMsg(info.store_id.clone(), msg));
1546            }
1547
1548            recv(cmds_rx) -> res => {
1549                let Ok(cmd) = res else {
1550                    // All command senders are gone, which can only happen if the
1551                    // `RecordingStream` itself has been dropped.
1552                    re_log::trace!("Shutting down forwarding_thread: all command senders are gone");
1553                    break;
1554                };
1555                if !handle_cmd(&info, cmd, &mut sink) {
1556                    break; // shutdown
1557                }
1558            }
1559        }
1560
1561        // NOTE: The receiving end of the command stream is owned solely by this thread.
1562        // Past this point, all command writes will return `ErrDisconnected`.
1563    }
1564}
1565
1566impl RecordingStream {
1567    /// Check if logging is enabled on this `RecordingStream`.
1568    ///
1569    /// If not, all recording calls will be ignored.
1570    #[inline]
1571    pub fn is_enabled(&self) -> bool {
1572        self.with(|_| true).unwrap_or(false)
1573    }
1574
1575    /// The [`StoreInfo`] associated with this `RecordingStream`.
1576    #[inline]
1577    pub fn store_info(&self) -> Option<StoreInfo> {
1578        self.with(|inner| inner.info.clone())
1579    }
1580
1581    /// Determine whether a fork has happened since creating this `RecordingStream`. In general, this means our
1582    /// batcher/sink threads are gone and all data logged since the fork has been dropped.
1583    ///
1584    /// It is essential that [`crate::cleanup_if_forked_child`] be called after forking the process. SDK-implementations
1585    /// should do this during their initialization phase.
1586    #[inline]
1587    pub fn is_forked_child(&self) -> bool {
1588        self.with(|inner| inner.is_forked_child()).unwrap_or(false)
1589    }
1590}
1591
1592impl RecordingStream {
1593    /// Records an arbitrary [`LogMsg`].
1594    #[inline]
1595    pub fn record_msg(&self, msg: LogMsg) {
1596        let f = move |inner: &RecordingStreamInner| {
1597            // NOTE: Internal channels can never be closed outside of the `Drop` impl, this send cannot
1598            // fail.
1599            inner.cmds_tx.send(Command::RecordMsg(msg)).ok();
1600            inner
1601                .tick
1602                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1603        };
1604
1605        if self.with(f).is_none() {
1606            re_log::warn_once!("Recording disabled - call to record_msg() ignored");
1607        }
1608    }
1609
1610    /// Records a single [`PendingRow`].
1611    ///
1612    /// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
1613    /// [`RecordingStream`]'s internal clock.
1614    ///
1615    /// Internally, incoming [`PendingRow`]s are automatically coalesced into larger [`Chunk`]s to
1616    /// optimize for transport.
1617    #[inline]
1618    pub fn record_row(&self, entity_path: EntityPath, mut row: PendingRow, inject_time: bool) {
1619        let f = move |inner: &RecordingStreamInner| {
1620            // NOTE: We're incrementing the current tick still.
1621            let tick = inner
1622                .tick
1623                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1624            if inject_time {
1625                // Get the current time on all timelines, for the current recording, on the current
1626                // thread…
1627                let mut now = self.now();
1628                // …and then also inject the current recording tick into it.
1629                now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1630
1631                // Inject all these times into the row, overriding conflicting times, if any.
1632                for (timeline, cell) in now {
1633                    row.timepoint.insert_cell(timeline, cell);
1634                }
1635            }
1636
1637            inner.batcher.push_row(entity_path, row);
1638        };
1639
1640        if self.with(f).is_none() {
1641            re_log::warn_once!("Recording disabled - call to record_row() ignored");
1642        }
1643    }
1644
1645    /// Logs a single [`Chunk`].
1646    ///
1647    /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1648    /// If you don't want to inject these, use [`Self::send_chunk`] instead.
1649    #[inline]
1650    pub fn log_chunk(&self, mut chunk: Chunk) {
1651        let f = move |inner: &RecordingStreamInner| {
1652            // TODO(cmc): Repeating these values is pretty wasteful. Would be nice to have a way of
1653            // indicating these are fixed across the whole chunk.
1654            // Inject the log time
1655            {
1656                let time_timeline = Timeline::log_time();
1657                let time =
1658                    TimeInt::new_temporal(re_log_types::Timestamp::now().nanos_since_epoch());
1659
1660                let repeated_time = std::iter::repeat(time.as_i64())
1661                    .take(chunk.num_rows())
1662                    .collect();
1663
1664                let time_column = TimeColumn::new(Some(true), time_timeline, repeated_time);
1665
1666                if let Err(err) = chunk.add_timeline(time_column) {
1667                    re_log::error!(
1668                        "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1669                        time_timeline.name(),
1670                        err
1671                    );
1672                    return;
1673                }
1674            }
1675            // Inject the log tick
1676            {
1677                let tick_timeline = Timeline::log_tick();
1678
1679                let tick = inner
1680                    .tick
1681                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1682
1683                let repeated_tick = std::iter::repeat(tick).take(chunk.num_rows()).collect();
1684
1685                let tick_chunk = TimeColumn::new(Some(true), tick_timeline, repeated_tick);
1686
1687                if let Err(err) = chunk.add_timeline(tick_chunk) {
1688                    re_log::error!(
1689                        "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1690                        tick_timeline.name(),
1691                        err
1692                    );
1693                    return;
1694                }
1695            }
1696
1697            inner.batcher.push_chunk(chunk);
1698        };
1699
1700        if self.with(f).is_none() {
1701            re_log::warn_once!("Recording disabled - call to log_chunk() ignored");
1702        }
1703    }
1704
1705    /// Logs multiple [`Chunk`]s.
1706    ///
1707    /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1708    /// for that use [`Self::log_chunks`].
1709    pub fn log_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1710        for chunk in chunks {
1711            self.log_chunk(chunk);
1712        }
1713    }
1714
1715    /// Records a single [`Chunk`].
1716    ///
1717    /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1718    /// If you don't want to inject these, use [`Self::send_chunks`] instead.
1719    #[inline]
1720    pub fn send_chunk(&self, chunk: Chunk) {
1721        let f = move |inner: &RecordingStreamInner| {
1722            inner.batcher.push_chunk(chunk);
1723        };
1724
1725        if self.with(f).is_none() {
1726            re_log::warn_once!("Recording disabled - call to send_chunk() ignored");
1727        }
1728    }
1729
1730    /// Records multiple [`Chunk`]s.
1731    ///
1732    /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1733    /// for that use [`Self::log_chunks`].
1734    pub fn send_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1735        for chunk in chunks {
1736            self.send_chunk(chunk);
1737        }
1738    }
1739
1740    /// Swaps the underlying sink for a new one.
1741    ///
1742    /// This guarantees that:
1743    /// 1. all pending rows and chunks are batched, collected and sent down the current sink,
1744    /// 2. the current sink is flushed if it has pending data in its buffers,
1745    /// 3. the current sink's backlog, if there's any, is forwarded to the new sink.
1746    ///
1747    /// When this function returns, the calling thread is guaranteed that all future record calls
1748    /// will end up in the new sink.
1749    ///
1750    /// ## Data loss
1751    ///
1752    /// If the current sink is in a broken state (e.g. a gRPC sink with a broken connection that
1753    /// cannot be repaired), all pending data in its buffers will be dropped.
1754    pub fn set_sink(&self, sink: Box<dyn LogSink>) {
1755        if self.is_forked_child() {
1756            re_log::error_once!(
1757                "Fork detected during set_sink. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1758            );
1759            return;
1760        }
1761
1762        let f = move |inner: &RecordingStreamInner| {
1763            // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1764            // are safe.
1765
1766            // 1. Flush the batcher down the chunk channel
1767            inner.batcher.flush_blocking();
1768
1769            // 2. Receive pending chunks from the batcher's channel
1770            inner.cmds_tx.send(Command::PopPendingChunks).ok();
1771
1772            // 3. Swap the sink, which will internally make sure to re-ingest the backlog if needed
1773            inner.cmds_tx.send(Command::SwapSink(sink)).ok();
1774
1775            // 4. Before we give control back to the caller, we need to make sure that the swap has
1776            //    taken place: we don't want the user to send data to the old sink!
1777            re_log::trace!("Waiting for sink swap to complete…");
1778            let (cmd, oneshot) = Command::flush();
1779            inner.cmds_tx.send(cmd).ok();
1780            oneshot.recv().ok();
1781            re_log::trace!("Sink swap completed.");
1782        };
1783
1784        if self.with(f).is_none() {
1785            re_log::warn_once!("Recording disabled - call to set_sink() ignored");
1786        }
1787    }
1788
1789    /// Initiates a flush of the pipeline and returns immediately.
1790    ///
1791    /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
1792    /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1793    pub fn flush_async(&self) {
1794        if self.is_forked_child() {
1795            re_log::error_once!(
1796                "Fork detected during flush_async. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1797            );
1798            return;
1799        }
1800
1801        let f = move |inner: &RecordingStreamInner| {
1802            // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1803            // are safe.
1804
1805            // 1. Synchronously flush the batcher down the chunk channel
1806            //
1807            // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all chunks
1808            // are ready to be drained by the time this call returns.
1809            // It cannot block indefinitely and is fairly fast as it only requires compute (no I/O).
1810            inner.batcher.flush_blocking();
1811
1812            // 2. Drain all pending chunks from the batcher's channel _before_ any other future command
1813            inner.cmds_tx.send(Command::PopPendingChunks).ok();
1814
1815            // 3. Asynchronously flush everything down the sink
1816            let (cmd, _) = Command::flush();
1817            inner.cmds_tx.send(cmd).ok();
1818        };
1819
1820        if self.with(f).is_none() {
1821            re_log::warn_once!("Recording disabled - call to flush_async() ignored");
1822        }
1823    }
1824
1825    /// Initiates a flush the batching pipeline and waits for it to propagate.
1826    ///
1827    /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1828    pub fn flush_blocking(&self) {
1829        if self.is_forked_child() {
1830            re_log::error_once!(
1831                "Fork detected during flush. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1832            );
1833            return;
1834        }
1835
1836        let f = move |inner: &RecordingStreamInner| {
1837            // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1838            // are safe.
1839
1840            // 1. Flush the batcher down the chunk channel
1841            inner.batcher.flush_blocking();
1842
1843            // 2. Drain all pending chunks from the batcher's channel _before_ any other future command
1844            inner.cmds_tx.send(Command::PopPendingChunks).ok();
1845
1846            // 3. Wait for all chunks to have been forwarded down the sink
1847            let (cmd, oneshot) = Command::flush();
1848            inner.cmds_tx.send(cmd).ok();
1849            oneshot.recv().ok();
1850        };
1851
1852        if self.with(f).is_none() {
1853            re_log::warn_once!("Recording disabled - call to flush_blocking() ignored");
1854        }
1855    }
1856}
1857
1858impl RecordingStream {
1859    /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
1860    /// the specified address.
1861    ///
1862    /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
1863    ///
1864    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1865    /// terms of data durability and ordering.
1866    /// See [`Self::set_sink`] for more information.
1867    pub fn connect_grpc(&self) -> RecordingStreamResult<()> {
1868        self.connect_grpc_opts(
1869            format!(
1870                "rerun+http://127.0.0.1:{}/proxy",
1871                re_grpc_server::DEFAULT_SERVER_PORT
1872            ),
1873            crate::default_flush_timeout(),
1874        )
1875    }
1876
1877    /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
1878    /// the specified address.
1879    ///
1880    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1881    /// terms of data durability and ordering.
1882    /// See [`Self::set_sink`] for more information.
1883    ///
1884    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
1885    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
1886    /// call to `flush` to block indefinitely if a connection cannot be established.
1887    pub fn connect_grpc_opts(
1888        &self,
1889        url: impl Into<String>,
1890        flush_timeout: Option<Duration>,
1891    ) -> RecordingStreamResult<()> {
1892        if forced_sink_path().is_some() {
1893            re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
1894            return Ok(());
1895        }
1896
1897        let url: String = url.into();
1898        let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
1899            return Err(RecordingStreamError::NotAProxyEndpoint);
1900        };
1901
1902        let sink = crate::log_sink::GrpcSink::new(uri, flush_timeout);
1903
1904        self.set_sink(Box::new(sink));
1905        Ok(())
1906    }
1907
1908    #[cfg(feature = "server")]
1909    /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
1910    /// `rerun+http://127.0.0.1:9876/proxy`.
1911    ///
1912    /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
1913    ///
1914    /// You can connect a viewer to it with `rerun --connect`.
1915    ///
1916    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
1917    /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
1918    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
1919    pub fn serve_grpc(
1920        &self,
1921        server_memory_limit: re_memory::MemoryLimit,
1922    ) -> RecordingStreamResult<()> {
1923        self.serve_grpc_opts("0.0.0.0", crate::DEFAULT_SERVER_PORT, server_memory_limit)
1924    }
1925
1926    #[cfg(feature = "server")]
1927    /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
1928    /// `rerun+http://{bind_ip}:{port}/proxy`.
1929    ///
1930    /// `0.0.0.0` is a good default for `bind_ip`.
1931    ///
1932    /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
1933    /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
1934    /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
1935    pub fn serve_grpc_opts(
1936        &self,
1937        bind_ip: impl AsRef<str>,
1938        port: u16,
1939        server_memory_limit: re_memory::MemoryLimit,
1940    ) -> RecordingStreamResult<()> {
1941        if forced_sink_path().is_some() {
1942            re_log::debug!("Ignored setting GrpcServerSink since {ENV_FORCE_SAVE} is set");
1943            return Ok(());
1944        }
1945
1946        let sink =
1947            crate::grpc_server::GrpcServerSink::new(bind_ip.as_ref(), port, server_memory_limit)?;
1948
1949        self.set_sink(Box::new(sink));
1950        Ok(())
1951    }
1952
1953    /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
1954    /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
1955    /// new process.
1956    ///
1957    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
1958    /// that viewer instead of starting a new one.
1959    ///
1960    /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
1961    /// as well as the underlying connection.
1962    ///
1963    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1964    /// terms of data durability and ordering.
1965    /// See [`Self::set_sink`] for more information.
1966    pub fn spawn(&self) -> RecordingStreamResult<()> {
1967        self.spawn_opts(&Default::default(), crate::default_flush_timeout())
1968    }
1969
1970    /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
1971    /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
1972    /// new process.
1973    ///
1974    /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
1975    /// that viewer instead of starting a new one.
1976    ///
1977    /// The behavior of the spawned Viewer can be configured via `opts`.
1978    /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
1979    ///
1980    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1981    /// terms of data durability and ordering.
1982    /// See [`Self::set_sink`] for more information.
1983    ///
1984    /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
1985    /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
1986    /// call to `flush` to block indefinitely if a connection cannot be established.
1987    pub fn spawn_opts(
1988        &self,
1989        opts: &crate::SpawnOptions,
1990        flush_timeout: Option<Duration>,
1991    ) -> RecordingStreamResult<()> {
1992        if !self.is_enabled() {
1993            re_log::debug!("Rerun disabled - call to spawn() ignored");
1994            return Ok(());
1995        }
1996        if forced_sink_path().is_some() {
1997            re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
1998            return Ok(());
1999        }
2000
2001        crate::spawn(opts)?;
2002
2003        self.connect_grpc_opts(
2004            format!("rerun+http://{}/proxy", opts.connect_addr()),
2005            flush_timeout,
2006        )?;
2007
2008        Ok(())
2009    }
2010
2011    /// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
2012    /// [`MemorySinkStorage`].
2013    ///
2014    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2015    /// terms of data durability and ordering.
2016    /// See [`Self::set_sink`] for more information.
2017    pub fn memory(&self) -> MemorySinkStorage {
2018        let sink = crate::sink::MemorySink::new(self.clone());
2019        let storage = sink.buffer();
2020        self.set_sink(Box::new(sink));
2021        storage
2022    }
2023
2024    /// Swaps the underlying sink for a [`crate::sink::BinaryStreamSink`] sink and returns the associated
2025    /// [`BinaryStreamStorage`].
2026    ///
2027    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2028    /// terms of data durability and ordering.
2029    /// See [`Self::set_sink`] for more information.
2030    pub fn binary_stream(&self) -> BinaryStreamStorage {
2031        let (sink, storage) = crate::sink::BinaryStreamSink::new(self.clone());
2032        self.set_sink(Box::new(sink));
2033        storage
2034    }
2035
2036    /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2037    ///
2038    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2039    /// terms of data durability and ordering.
2040    /// See [`Self::set_sink`] for more information.
2041    pub fn save(
2042        &self,
2043        path: impl Into<std::path::PathBuf>,
2044    ) -> Result<(), crate::sink::FileSinkError> {
2045        self.save_opts(path)
2046    }
2047
2048    /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2049    ///
2050    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2051    /// terms of data durability and ordering.
2052    /// See [`Self::set_sink`] for more information.
2053    ///
2054    /// If a blueprint was provided, it will be stored first in the file.
2055    /// Blueprints are currently an experimental part of the Rust SDK.
2056    pub fn save_opts(
2057        &self,
2058        path: impl Into<std::path::PathBuf>,
2059    ) -> Result<(), crate::sink::FileSinkError> {
2060        if forced_sink_path().is_some() {
2061            re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2062            return Ok(());
2063        }
2064
2065        let sink = crate::sink::FileSink::new(path)?;
2066
2067        self.set_sink(Box::new(sink));
2068
2069        Ok(())
2070    }
2071
2072    /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2073    ///
2074    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2075    /// default back to `buffered` mode, in order not to break the user's terminal.
2076    ///
2077    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2078    /// terms of data durability and ordering.
2079    /// See [`Self::set_sink`] for more information.
2080    pub fn stdout(&self) -> Result<(), crate::sink::FileSinkError> {
2081        self.stdout_opts()
2082    }
2083
2084    /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2085    ///
2086    /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2087    /// default back to `buffered` mode, in order not to break the user's terminal.
2088    ///
2089    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2090    /// terms of data durability and ordering.
2091    /// See [`Self::set_sink`] for more information.
2092    ///
2093    /// If a blueprint was provided, it will be stored first in the file.
2094    /// Blueprints are currently an experimental part of the Rust SDK.
2095    pub fn stdout_opts(&self) -> Result<(), crate::sink::FileSinkError> {
2096        if forced_sink_path().is_some() {
2097            re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2098            return Ok(());
2099        }
2100
2101        if std::io::stdout().is_terminal() {
2102            re_log::debug!("Ignored call to stdout() because stdout is a terminal");
2103            self.set_sink(Box::new(crate::log_sink::BufferedSink::new()));
2104            return Ok(());
2105        }
2106
2107        let sink = crate::sink::FileSink::stdout()?;
2108
2109        self.set_sink(Box::new(sink));
2110
2111        Ok(())
2112    }
2113
2114    /// Swaps the underlying sink for a [`crate::sink::BufferedSink`].
2115    ///
2116    /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2117    /// terms of data durability and ordering.
2118    /// See [`Self::set_sink`] for more information.
2119    pub fn disconnect(&self) {
2120        let f = move |inner: &RecordingStreamInner| {
2121            // When disconnecting, we need to make sure that pending top-level `DataLoader` threads that
2122            // were started from the SDK run to completion.
2123            inner.wait_for_dataloaders();
2124            self.set_sink(Box::new(crate::sink::BufferedSink::new()));
2125        };
2126
2127        if self.with(f).is_none() {
2128            re_log::warn_once!("Recording disabled - call to disconnect() ignored");
2129        }
2130    }
2131
2132    /// Send a blueprint through this recording stream
2133    pub fn send_blueprint(
2134        &self,
2135        blueprint: Vec<LogMsg>,
2136        activation_cmd: BlueprintActivationCommand,
2137    ) {
2138        let mut blueprint_id = None;
2139        for msg in blueprint {
2140            if blueprint_id.is_none() {
2141                blueprint_id = Some(msg.store_id().clone());
2142            }
2143            self.record_msg(msg);
2144        }
2145
2146        if let Some(blueprint_id) = blueprint_id {
2147            if blueprint_id == activation_cmd.blueprint_id {
2148                // Let the viewer know that the blueprint has been fully received,
2149                // and that it can now be activated.
2150                // We don't want to activate half-loaded blueprints, because that can be confusing,
2151                // and can also lead to problems with view heuristics.
2152                self.record_msg(activation_cmd.into());
2153            } else {
2154                re_log::warn!(
2155                    "Blueprint ID mismatch when sending blueprint: {} != {}. Ignoring activation.",
2156                    blueprint_id,
2157                    activation_cmd.blueprint_id
2158                );
2159            }
2160        }
2161    }
2162}
2163
2164impl fmt::Debug for RecordingStream {
2165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2166        let with = |inner: &RecordingStreamInner| {
2167            let RecordingStreamInner {
2168                // This pattern match prevents _accidentally_ omitting data from the debug output
2169                // when new fields are added.
2170                info,
2171                properties,
2172                tick,
2173                cmds_tx: _,
2174                batcher: _,
2175                batcher_to_sink_handle: _,
2176                dataloader_handles,
2177                pid_at_creation,
2178            } = inner;
2179
2180            f.debug_struct("RecordingStream")
2181                .field("info", &info)
2182                .field("properties", &properties)
2183                .field("tick", &tick)
2184                .field("pending_dataloaders", &dataloader_handles.lock().len())
2185                .field("pid_at_creation", &pid_at_creation)
2186                .finish_non_exhaustive()
2187        };
2188
2189        match self.with(with) {
2190            Some(res) => res,
2191            None => write!(f, "RecordingStream {{ disabled }}"),
2192        }
2193    }
2194}
2195
2196// --- Stateful time ---
2197
2198/// Thread-local data.
2199#[derive(Default)]
2200struct ThreadInfo {
2201    /// The current time per-thread per-recording, which can be set by users.
2202    timepoints: HashMap<StoreId, TimePoint>,
2203}
2204
2205impl ThreadInfo {
2206    fn thread_now(rid: &StoreId) -> TimePoint {
2207        Self::with(|ti| ti.now(rid))
2208    }
2209
2210    fn set_thread_time(rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2211        Self::with(|ti| ti.set_time(rid, timeline, cell));
2212    }
2213
2214    fn unset_thread_time(rid: &StoreId, timeline: &TimelineName) {
2215        Self::with(|ti| ti.unset_time(rid, timeline));
2216    }
2217
2218    fn reset_thread_time(rid: &StoreId) {
2219        Self::with(|ti| ti.reset_time(rid));
2220    }
2221
2222    /// Get access to the thread-local [`ThreadInfo`].
2223    fn with<R>(f: impl FnOnce(&mut Self) -> R) -> R {
2224        use std::cell::RefCell;
2225        thread_local! {
2226            static THREAD_INFO: RefCell<Option<ThreadInfo>> = const { RefCell::new(None) };
2227        }
2228
2229        THREAD_INFO.with(|thread_info| {
2230            let mut thread_info = thread_info.borrow_mut();
2231            let thread_info = thread_info.get_or_insert_with(Self::default);
2232            f(thread_info)
2233        })
2234    }
2235
2236    fn now(&self, rid: &StoreId) -> TimePoint {
2237        let mut timepoint = self.timepoints.get(rid).cloned().unwrap_or_default();
2238        timepoint.insert_cell(TimelineName::log_time(), TimeCell::timestamp_now());
2239        timepoint
2240    }
2241
2242    fn set_time(&mut self, rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2243        self.timepoints
2244            .entry(rid.clone())
2245            .or_default()
2246            .insert_cell(timeline, cell);
2247    }
2248
2249    fn unset_time(&mut self, rid: &StoreId, timeline: &TimelineName) {
2250        if let Some(timepoint) = self.timepoints.get_mut(rid) {
2251            timepoint.remove(timeline);
2252        }
2253    }
2254
2255    fn reset_time(&mut self, rid: &StoreId) {
2256        if let Some(timepoint) = self.timepoints.get_mut(rid) {
2257            *timepoint = TimePoint::default();
2258        }
2259    }
2260}
2261
2262impl RecordingStream {
2263    /// Returns the current time of the recording on the current thread.
2264    pub fn now(&self) -> TimePoint {
2265        let f = move |inner: &RecordingStreamInner| ThreadInfo::thread_now(&inner.info.store_id);
2266        if let Some(res) = self.with(f) {
2267            res
2268        } else {
2269            re_log::warn_once!("Recording disabled - call to now() ignored");
2270            TimePoint::default()
2271        }
2272    }
2273
2274    /// Set the current time of the recording, for the current calling thread.
2275    ///
2276    /// Used for all subsequent logging performed from this same thread, until the next call
2277    /// to one of the index/time setting methods.
2278    ///
2279    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2280    ///
2281    /// See also:
2282    /// - [`Self::set_time`]
2283    /// - [`Self::set_time_sequence`]
2284    /// - [`Self::set_duration_secs`]
2285    /// - [`Self::disable_timeline`]
2286    /// - [`Self::reset_time`]
2287    pub fn set_timepoint(&self, timepoint: impl Into<TimePoint>) {
2288        let f = move |inner: &RecordingStreamInner| {
2289            let timepoint = timepoint.into();
2290            for (timeline, time) in timepoint {
2291                ThreadInfo::set_thread_time(&inner.info.store_id, timeline, time);
2292            }
2293        };
2294
2295        if self.with(f).is_none() {
2296            re_log::warn_once!("Recording disabled - call to set_timepoint() ignored");
2297        }
2298    }
2299
2300    /// Set the current value of one of the timelines.
2301    ///
2302    /// Used for all subsequent logging performed from this same thread, until the next call
2303    /// to one of the index/time setting methods.
2304    ///
2305    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2306    ///
2307    /// Example:
2308    /// ```no_run
2309    /// # mod rerun { pub use re_sdk::*; }
2310    /// # let rec: rerun::RecordingStream = unimplemented!();
2311    /// rec.set_time("frame_nr", rerun::TimeCell::from_sequence(42));
2312    /// rec.set_time("duration", std::time::Duration::from_millis(123));
2313    /// rec.set_time("capture_time", std::time::SystemTime::now());
2314    /// ```
2315    ///
2316    /// See also:
2317    /// - [`Self::set_timepoint`]
2318    /// - [`Self::set_time_sequence`]
2319    /// - [`Self::set_duration_secs`]
2320    /// - [`Self::disable_timeline`]
2321    /// - [`Self::reset_time`]
2322    pub fn set_time(&self, timeline: impl Into<TimelineName>, value: impl TryInto<TimeCell>) {
2323        let f = move |inner: &RecordingStreamInner| {
2324            let timeline = timeline.into();
2325            if let Ok(value) = value.try_into() {
2326                ThreadInfo::set_thread_time(&inner.info.store_id, timeline, value);
2327            } else {
2328                re_log::warn_once!(
2329                    "set_time({timeline}): Failed to convert the given value to an TimeCell"
2330                );
2331            }
2332        };
2333
2334        if self.with(f).is_none() {
2335            re_log::warn_once!("Recording disabled - call to set_time() ignored");
2336        }
2337    }
2338
2339    /// Set the current time of the recording, for the current calling thread.
2340    ///
2341    /// Short for `set_time(timeline, rerun::TimeCell::from_sequence(sequence))`.
2342    ///
2343    /// Used for all subsequent logging performed from this same thread, until the next call
2344    /// to one of the index/time setting methods.
2345    ///
2346    /// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
2347    /// You can remove a timeline again using `rec.disable_timeline("frame_nr")`.
2348    ///
2349    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2350    ///
2351    /// See also:
2352    /// - [`Self::set_time`]
2353    /// - [`Self::set_timepoint`]
2354    /// - [`Self::set_duration_secs`]
2355    /// - [`Self::disable_timeline`]
2356    /// - [`Self::reset_time`]
2357    #[inline]
2358    pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: impl Into<i64>) {
2359        self.set_time(timeline, TimeCell::from_sequence(sequence.into()));
2360    }
2361
2362    /// Set the current time of the recording, for the current calling thread.
2363    ///
2364    /// Short for `set_time(timeline, std::time::Duration::from_secs_f64(secs))`..
2365    ///
2366    /// Used for all subsequent logging performed from this same thread, until the next call
2367    /// to one of the index/time setting methods.
2368    ///
2369    /// For example: `rec.set_duration_secs("time_since_start", time_offset)`.
2370    /// You can remove a timeline again using `rec.disable_timeline("time_since_start")`.
2371    ///
2372    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2373    ///
2374    /// See also:
2375    /// - [`Self::set_time`]
2376    /// - [`Self::set_timepoint`]
2377    /// - [`Self::set_timestamp_secs_since_epoch`]
2378    /// - [`Self::set_time_sequence`]
2379    /// - [`Self::disable_timeline`]
2380    /// - [`Self::reset_time`]
2381    #[inline]
2382    pub fn set_duration_secs(&self, timeline: impl Into<TimelineName>, secs: impl Into<f64>) {
2383        self.set_time(timeline, std::time::Duration::from_secs_f64(secs.into()));
2384    }
2385
2386    /// Set a timestamp as seconds since Unix epoch (1970-01-01 00:00:00 UTC).
2387    ///
2388    /// Short for `self.set_time(timeline, rerun::TimeCell::from_timestamp_secs_since_epoch(secs))`.
2389    ///
2390    /// Used for all subsequent logging performed from this same thread, until the next call
2391    /// to one of the index/time setting methods.
2392    ///
2393    /// For example: `rec.set_duration_secs("time_since_start", time_offset)`.
2394    /// You can remove a timeline again using `rec.disable_timeline("time_since_start")`.
2395    ///
2396    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2397    ///
2398    /// See also:
2399    /// - [`Self::set_time`]
2400    /// - [`Self::set_timepoint`]
2401    /// - [`Self::set_duration_secs`]
2402    /// - [`Self::set_time_sequence`]
2403    /// - [`Self::disable_timeline`]
2404    /// - [`Self::reset_time`]
2405    #[inline]
2406    pub fn set_timestamp_secs_since_epoch(
2407        &self,
2408        timeline: impl Into<TimelineName>,
2409        secs: impl Into<f64>,
2410    ) {
2411        self.set_time(
2412            timeline,
2413            TimeCell::from_timestamp_secs_since_epoch(secs.into()),
2414        );
2415    }
2416
2417    /// Set the current time of the recording, for the current calling thread.
2418    ///
2419    /// Used for all subsequent logging performed from this same thread, until the next call
2420    /// to one of the index/time setting methods.
2421    ///
2422    /// For example: `rec.set_time_secs("sim_time", sim_time_secs)`.
2423    /// You can remove a timeline again using `rec.disable_timeline("sim_time")`.
2424    ///
2425    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2426    ///
2427    /// See also:
2428    /// - [`Self::set_timepoint`]
2429    /// - [`Self::set_time_sequence`]
2430    /// - [`Self::set_time_nanos`]
2431    /// - [`Self::disable_timeline`]
2432    /// - [`Self::reset_time`]
2433    #[deprecated(
2434        since = "0.23.0",
2435        note = "Use either `set_duration_secs` or `set_timestamp_secs_since_epoch` instead"
2436    )]
2437    #[inline]
2438    pub fn set_time_secs(&self, timeline: impl Into<TimelineName>, seconds: impl Into<f64>) {
2439        self.set_duration_secs(timeline, seconds);
2440    }
2441
2442    /// Set the current time of the recording, for the current calling thread.
2443    ///
2444    /// Used for all subsequent logging performed from this same thread, until the next call
2445    /// to one of the index/time setting methods.
2446    ///
2447    /// For example: `rec.set_time_nanos("sim_time", sim_time_nanos)`.
2448    /// You can remove a timeline again using `rec.disable_timeline("sim_time")`.
2449    ///
2450    /// There is no requirement of monotonicity. You can move the time backwards if you like.
2451    ///
2452    /// See also:
2453    /// - [`Self::set_timepoint`]
2454    /// - [`Self::set_time_sequence`]
2455    /// - [`Self::set_time_secs`]
2456    /// - [`Self::disable_timeline`]
2457    /// - [`Self::reset_time`]
2458    #[deprecated(
2459        since = "0.23.0",
2460        note = "Use `set_time` with either `rerun::TimeCell::from_duration_nanos` or `rerun::TimeCell::from_timestamp_nanos_since_epoch`, or with `std::time::Duration` or `std::time::SystemTime`."
2461    )]
2462    #[inline]
2463    pub fn set_time_nanos(
2464        &self,
2465        timeline: impl Into<TimelineName>,
2466        nanos_since_epoch: impl Into<i64>,
2467    ) {
2468        self.set_time(
2469            timeline,
2470            TimeCell::from_timestamp_nanos_since_epoch(nanos_since_epoch.into()),
2471        );
2472    }
2473
2474    /// Clears out the current time of the recording for the specified timeline, for the
2475    /// current calling thread.
2476    ///
2477    /// For example: `rec.disable_timeline("frame")`, `rec.disable_timeline("sim_time")`.
2478    ///
2479    /// See also:
2480    /// - [`Self::set_timepoint`]
2481    /// - [`Self::set_time_sequence`]
2482    /// - [`Self::set_time_secs`]
2483    /// - [`Self::set_time_nanos`]
2484    /// - [`Self::reset_time`]
2485    pub fn disable_timeline(&self, timeline: impl Into<TimelineName>) {
2486        let f = move |inner: &RecordingStreamInner| {
2487            let timeline = timeline.into();
2488            ThreadInfo::unset_thread_time(&inner.info.store_id, &timeline);
2489        };
2490
2491        if self.with(f).is_none() {
2492            re_log::warn_once!("Recording disabled - call to disable_timeline() ignored");
2493        }
2494    }
2495
2496    /// Clears out the current time of the recording, for the current calling thread.
2497    ///
2498    /// Used for all subsequent logging performed from this same thread, until the next call
2499    /// to one of the index/time setting methods.
2500    ///
2501    /// For example: `rec.reset_time()`.
2502    ///
2503    /// See also:
2504    /// - [`Self::set_timepoint`]
2505    /// - [`Self::set_time_sequence`]
2506    /// - [`Self::set_time_secs`]
2507    /// - [`Self::set_time_nanos`]
2508    /// - [`Self::disable_timeline`]
2509    pub fn reset_time(&self) {
2510        let f = move |inner: &RecordingStreamInner| {
2511            ThreadInfo::reset_thread_time(&inner.info.store_id);
2512        };
2513
2514        if self.with(f).is_none() {
2515            re_log::warn_once!("Recording disabled - call to reset_time() ignored");
2516        }
2517    }
2518}
2519
2520// ---
2521
2522#[cfg(test)]
2523mod tests {
2524    use re_log_types::example_components::MyLabel;
2525
2526    use super::*;
2527
2528    #[test]
2529    fn impl_send_sync() {
2530        fn assert_send_sync<T: Send + Sync>() {}
2531        assert_send_sync::<RecordingStream>();
2532    }
2533
2534    #[test]
2535    fn never_flush() {
2536        let rec = RecordingStreamBuilder::new("rerun_example_never_flush")
2537            .enabled(true)
2538            .batcher_config(ChunkBatcherConfig::NEVER)
2539            .buffered()
2540            .unwrap();
2541
2542        let store_info = rec.store_info().unwrap();
2543
2544        let rows = example_rows(false);
2545        for row in rows.clone() {
2546            rec.record_row("a".into(), row, false);
2547        }
2548
2549        let storage = rec.memory();
2550        let mut msgs = {
2551            let mut msgs = storage.take();
2552            msgs.reverse();
2553            msgs
2554        };
2555
2556        // First message should be a set_store_info resulting from the original sink swap to
2557        // buffered mode.
2558        match msgs.pop().unwrap() {
2559            LogMsg::SetStoreInfo(msg) => {
2560                assert!(msg.row_id != *RowId::ZERO);
2561                similar_asserts::assert_eq!(store_info, msg.info);
2562            }
2563            _ => panic!("expected SetStoreInfo"),
2564        }
2565
2566        // Second message should be a set_store_info resulting from the later sink swap from
2567        // buffered mode into in-memory mode.
2568        // This arrives _before_ the data itself since we're using manual flushing.
2569        match msgs.pop().unwrap() {
2570            LogMsg::SetStoreInfo(msg) => {
2571                assert!(msg.row_id != *RowId::ZERO);
2572                similar_asserts::assert_eq!(store_info, msg.info);
2573            }
2574            _ => panic!("expected SetStoreInfo"),
2575        }
2576
2577        // The following flushes were sent as a result of the implicit flush when swapping the
2578        // underlying sink from buffered to in-memory.
2579
2580        // Chunk that contains the `RecordProperties`.
2581        match msgs.pop().unwrap() {
2582            LogMsg::ArrowMsg(rid, msg) => {
2583                assert_eq!(store_info.store_id, rid);
2584
2585                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2586
2587                chunk.sanity_check().unwrap();
2588            }
2589            _ => panic!("expected ArrowMsg"),
2590        }
2591
2592        // Another chunk that contains `RecordProperties`.
2593        match msgs.pop().unwrap() {
2594            LogMsg::ArrowMsg(rid, msg) => {
2595                assert_eq!(store_info.store_id, rid);
2596
2597                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2598
2599                chunk.sanity_check().unwrap();
2600            }
2601            _ => panic!("expected ArrowMsg"),
2602        }
2603
2604        // Final message is the batched chunk itself.
2605        match msgs.pop().unwrap() {
2606            LogMsg::ArrowMsg(rid, msg) => {
2607                assert_eq!(store_info.store_id, rid);
2608
2609                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2610
2611                chunk.sanity_check().unwrap();
2612            }
2613            _ => panic!("expected ArrowMsg"),
2614        }
2615
2616        // That's all.
2617        assert!(msgs.pop().is_none());
2618    }
2619
2620    #[test]
2621    fn always_flush() {
2622        let rec = RecordingStreamBuilder::new("rerun_example_always_flush")
2623            .enabled(true)
2624            .batcher_config(ChunkBatcherConfig::ALWAYS)
2625            .buffered()
2626            .unwrap();
2627
2628        let store_info = rec.store_info().unwrap();
2629
2630        let rows = example_rows(false);
2631        for row in rows.clone() {
2632            rec.record_row("a".into(), row, false);
2633        }
2634
2635        let storage = rec.memory();
2636        let mut msgs = {
2637            let mut msgs = storage.take();
2638            msgs.reverse();
2639            msgs
2640        };
2641
2642        // First message should be a set_store_info resulting from the original sink swap to
2643        // buffered mode.
2644        match msgs.pop().unwrap() {
2645            LogMsg::SetStoreInfo(msg) => {
2646                assert!(msg.row_id != *RowId::ZERO);
2647                similar_asserts::assert_eq!(store_info, msg.info);
2648            }
2649            _ => panic!("expected SetStoreInfo"),
2650        }
2651
2652        // Second message should be a set_store_info resulting from the later sink swap from
2653        // buffered mode into in-memory mode.
2654        // This arrives _before_ the data itself since we're using manual flushing.
2655        match msgs.pop().unwrap() {
2656            LogMsg::SetStoreInfo(msg) => {
2657                assert!(msg.row_id != *RowId::ZERO);
2658                similar_asserts::assert_eq!(store_info, msg.info);
2659            }
2660            _ => panic!("expected SetStoreInfo"),
2661        }
2662
2663        let mut assert_next_row = || match msgs.pop().unwrap() {
2664            LogMsg::ArrowMsg(rid, msg) => {
2665                assert_eq!(store_info.store_id, rid);
2666
2667                let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2668
2669                chunk.sanity_check().unwrap();
2670            }
2671            _ => panic!("expected ArrowMsg"),
2672        };
2673
2674        // 3rd, 4th, 5th, 6th, and 7th messages are all the single-row batched chunks themselves,
2675        // which were sent as a result of the implicit flush when swapping the underlying sink
2676        // from buffered to in-memory. Note that these messages contain the 2 recording property
2677        // chunks.
2678        assert_next_row();
2679        assert_next_row();
2680        assert_next_row();
2681        assert_next_row();
2682        assert_next_row();
2683
2684        // That's all.
2685        assert!(msgs.pop().is_none());
2686    }
2687
2688    #[test]
2689    fn flush_hierarchy() {
2690        let (rec, storage) = RecordingStreamBuilder::new("rerun_example_flush_hierarchy")
2691            .enabled(true)
2692            .batcher_config(ChunkBatcherConfig::NEVER)
2693            .memory()
2694            .unwrap();
2695
2696        let store_info = rec.store_info().unwrap();
2697
2698        let rows = example_rows(false);
2699        for row in rows.clone() {
2700            rec.record_row("a".into(), row, false);
2701        }
2702
2703        {
2704            let mut msgs = {
2705                let mut msgs = storage.take();
2706                msgs.reverse();
2707                msgs
2708            };
2709
2710            // First message should be a set_store_info resulting from the original sink swap
2711            // to in-memory mode.
2712            match msgs.pop().unwrap() {
2713                LogMsg::SetStoreInfo(msg) => {
2714                    assert!(msg.row_id != *RowId::ZERO);
2715                    similar_asserts::assert_eq!(store_info, msg.info);
2716                }
2717                _ => panic!("expected SetStoreInfo"),
2718            }
2719
2720            // For reasons, MemorySink ends up with 2 StoreInfos.
2721            // TODO(jleibs): Avoid a redundant StoreInfo message.
2722            match msgs.pop().unwrap() {
2723                LogMsg::SetStoreInfo(msg) => {
2724                    assert!(msg.row_id != *RowId::ZERO);
2725                    similar_asserts::assert_eq!(store_info, msg.info);
2726                }
2727                _ => panic!("expected SetStoreInfo"),
2728            }
2729
2730            // MemorySinkStorage transparently handles flushing during `take()`!
2731
2732            // The batch that contains the `RecordingProperties`.
2733            match msgs.pop().unwrap() {
2734                LogMsg::ArrowMsg(rid, msg) => {
2735                    assert_eq!(store_info.store_id, rid);
2736
2737                    let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2738
2739                    chunk.sanity_check().unwrap();
2740                }
2741                _ => panic!("expected ArrowMsg"),
2742            }
2743
2744            // For the same reasons as above, another chunk that contains the `RecordingProperties`.
2745            match msgs.pop().unwrap() {
2746                LogMsg::ArrowMsg(rid, msg) => {
2747                    assert_eq!(store_info.store_id, rid);
2748
2749                    let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2750
2751                    chunk.sanity_check().unwrap();
2752                }
2753                _ => panic!("expected ArrowMsg"),
2754            }
2755
2756            // The batched chunk itself, which was sent as a result of the explicit flush above.
2757            match msgs.pop().unwrap() {
2758                LogMsg::ArrowMsg(rid, msg) => {
2759                    assert_eq!(store_info.store_id, rid);
2760
2761                    let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2762
2763                    chunk.sanity_check().unwrap();
2764                }
2765                _ => panic!("expected ArrowMsg"),
2766            }
2767
2768            // That's all.
2769            assert!(msgs.pop().is_none());
2770        }
2771    }
2772
2773    #[test]
2774    fn disabled() {
2775        let (rec, storage) = RecordingStreamBuilder::new("rerun_example_disabled")
2776            .enabled(false)
2777            .batcher_config(ChunkBatcherConfig::ALWAYS)
2778            .memory()
2779            .unwrap();
2780
2781        let rows = example_rows(false);
2782        for row in rows.clone() {
2783            rec.record_row("a".into(), row, false);
2784        }
2785
2786        let mut msgs = {
2787            let mut msgs = storage.take();
2788            msgs.reverse();
2789            msgs
2790        };
2791
2792        // That's all.
2793        assert!(msgs.pop().is_none());
2794    }
2795
2796    #[test]
2797    fn test_set_thread_local() {
2798        // Regression-test for https://github.com/rerun-io/rerun/issues/2889
2799        std::thread::Builder::new()
2800            .name("test_thead".to_owned())
2801            .spawn(|| {
2802                let stream = RecordingStreamBuilder::new("rerun_example_test")
2803                    .buffered()
2804                    .unwrap();
2805                RecordingStream::set_thread_local(StoreKind::Recording, Some(stream));
2806            })
2807            .unwrap()
2808            .join()
2809            .unwrap();
2810    }
2811
2812    fn example_rows(static_: bool) -> Vec<PendingRow> {
2813        use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
2814        use re_types::{Component as _, Loggable};
2815
2816        let mut tick = 0i64;
2817        let mut timepoint = |frame_nr: i64| {
2818            let mut tp = TimePoint::default();
2819            if !static_ {
2820                tp.insert(Timeline::log_time(), re_log_types::Timestamp::now());
2821                tp.insert(Timeline::log_tick(), tick);
2822                tp.insert(Timeline::new_sequence("frame_nr"), frame_nr);
2823            }
2824            tick += 1;
2825            tp
2826        };
2827
2828        let row0 = {
2829            PendingRow {
2830                row_id: RowId::new(),
2831                timepoint: timepoint(1),
2832                components: [
2833                    (
2834                        MyPoint::descriptor(),
2835                        <MyPoint as Loggable>::to_arrow([
2836                            MyPoint::new(10.0, 10.0),
2837                            MyPoint::new(20.0, 20.0),
2838                        ])
2839                        .unwrap(),
2840                    ), //
2841                    (
2842                        MyColor::descriptor(),
2843                        <MyColor as Loggable>::to_arrow([MyColor(0x8080_80FF)]).unwrap(),
2844                    ), //
2845                    (
2846                        MyLabel::descriptor(),
2847                        <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
2848                    ), //
2849                ]
2850                .into_iter()
2851                .collect(),
2852            }
2853        };
2854
2855        let row1 = {
2856            PendingRow {
2857                row_id: RowId::new(),
2858                timepoint: timepoint(1),
2859                components: [
2860                    (
2861                        MyPoint::descriptor(),
2862                        <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
2863                    ), //
2864                    (
2865                        MyColor::descriptor(),
2866                        <MyColor as Loggable>::to_arrow([] as [MyColor; 0]).unwrap(),
2867                    ), //
2868                    (
2869                        MyLabel::descriptor(),
2870                        <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
2871                    ), //
2872                ]
2873                .into_iter()
2874                .collect(),
2875            }
2876        };
2877
2878        let row2 = {
2879            PendingRow {
2880                row_id: RowId::new(),
2881                timepoint: timepoint(1),
2882                components: [
2883                    (
2884                        MyPoint::descriptor(),
2885                        <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
2886                    ), //
2887                    (
2888                        MyColor::descriptor(),
2889                        <MyColor as Loggable>::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(),
2890                    ), //
2891                    (
2892                        MyLabel::descriptor(),
2893                        <MyLabel as Loggable>::to_arrow([MyLabel("hey".into())]).unwrap(),
2894                    ), //
2895                ]
2896                .into_iter()
2897                .collect(),
2898            }
2899        };
2900
2901        vec![row0, row1, row2]
2902    }
2903
2904    // See <https://github.com/rerun-io/rerun/pull/8587> for context.
2905    #[test]
2906    fn allows_componentbatch_unsized() {
2907        let labels = [
2908            MyLabel("a".into()),
2909            MyLabel("b".into()),
2910            MyLabel("c".into()),
2911        ];
2912
2913        let (rec, _mem) = RecordingStreamBuilder::new("rerun_example_test_componentbatch_unsized")
2914            .default_enabled(false)
2915            .enabled(false)
2916            .memory()
2917            .unwrap();
2918
2919        // This call used to *not* compile due to a lack of `?Sized` bounds.
2920        use re_types::ComponentBatch as _;
2921        rec.log("labels", &labels.try_serialized().unwrap())
2922            .unwrap();
2923    }
2924}