re_sdk/recording_stream.rs
1use std::fmt;
2use std::io::IsTerminal as _;
3use std::sync::Weak;
4use std::sync::{atomic::AtomicI64, Arc};
5use std::time::Duration;
6
7use ahash::HashMap;
8use crossbeam::channel::{Receiver, Sender};
9use itertools::Either;
10use nohash_hasher::IntMap;
11use parking_lot::Mutex;
12
13use re_chunk::{
14 Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError, ChunkComponents, ChunkError,
15 ChunkId, PendingRow, RowId, TimeColumn,
16};
17use re_log_types::{
18 ApplicationId, ArrowRecordBatchReleaseCallback, BlueprintActivationCommand, EntityPath, LogMsg,
19 StoreId, StoreInfo, StoreKind, StoreSource, TimeCell, TimeInt, TimePoint, Timeline,
20 TimelineName,
21};
22use re_types::archetypes::RecordingProperties;
23use re_types::components::Timestamp;
24use re_types::{AsComponents, SerializationError, SerializedComponentColumn};
25
26#[cfg(feature = "web_viewer")]
27use re_web_viewer_server::WebViewerServerPort;
28
29use crate::binary_stream_sink::BinaryStreamStorage;
30use crate::sink::{LogSink, MemorySinkStorage};
31
32// ---
33
34/// Private environment variable meant for tests.
35///
36/// When set, all recording streams will write to disk at the path indicated by the env-var rather
37/// than doing what they were asked to do - `connect()`, `buffered()`, even `save()` will re-use the same sink.
38const ENV_FORCE_SAVE: &str = "_RERUN_TEST_FORCE_SAVE";
39
40/// Returns path for force sink if private environment variable `_RERUN_TEST_FORCE_SAVE` is set
41///
42/// Newly created [`RecordingStream`]s should use a [`crate::sink::FileSink`] pointing to this path.
43/// Furthermore, [`RecordingStream::set_sink`] calls after this should not swap out to a new sink but re-use the existing one.
44/// Note that creating a new [`crate::sink::FileSink`] to the same file path (even temporarily) can cause
45/// a race between file creation (and thus clearing) and pending file writes.
46pub fn forced_sink_path() -> Option<String> {
47 std::env::var(ENV_FORCE_SAVE).ok()
48}
49
50/// Errors that can occur when creating/manipulating a [`RecordingStream`].
51#[derive(thiserror::Error, Debug)]
52pub enum RecordingStreamError {
53 /// Error within the underlying file sink.
54 #[error("Failed to create the underlying file sink: {0}")]
55 FileSink(#[from] re_log_encoding::FileSinkError),
56
57 /// Error within the underlying chunk batcher.
58 #[error("Failed to convert data to a valid chunk: {0}")]
59 Chunk(#[from] ChunkError),
60
61 /// Error within the underlying chunk batcher.
62 #[error("Failed to spawn the underlying batcher: {0}")]
63 ChunkBatcher(#[from] ChunkBatcherError),
64
65 /// Error within the underlying serializer.
66 #[error("Failed to serialize component data: {0}")]
67 Serialization(#[from] SerializationError),
68
69 /// Error spawning one of the background threads.
70 #[error("Failed to spawn background thread '{name}': {err}")]
71 SpawnThread {
72 /// Name of the thread
73 name: String,
74
75 /// Inner error explaining why the thread failed to spawn.
76 err: std::io::Error,
77 },
78
79 /// Error spawning a Rerun Viewer process.
80 #[error(transparent)] // makes bubbling all the way up to main look nice
81 SpawnViewer(#[from] crate::SpawnError),
82
83 /// Failure to host a web viewer and/or Rerun server.
84 #[cfg(feature = "web_viewer")]
85 #[error(transparent)]
86 WebSink(#[from] crate::web_viewer::WebViewerSinkError),
87
88 /// An error occurred while attempting to use a [`re_data_loader::DataLoader`].
89 #[cfg(feature = "data_loaders")]
90 #[error(transparent)]
91 DataLoaderError(#[from] re_data_loader::DataLoaderError),
92
93 /// Invalid gRPC server address.
94 #[error(transparent)]
95 UriError(#[from] re_uri::Error),
96
97 /// Invalid endpoint
98 #[error("not a `/proxy` endpoint")]
99 NotAProxyEndpoint,
100
101 /// Invalid bind IP.
102 #[error(transparent)]
103 InvalidAddress(#[from] std::net::AddrParseError),
104}
105
106/// Results that can occur when creating/manipulating a [`RecordingStream`].
107pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
108
109// ---
110
111/// Construct a [`RecordingStream`].
112///
113/// ``` no_run
114/// # use re_sdk::RecordingStreamBuilder;
115/// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
116/// # Ok::<(), Box<dyn std::error::Error>>(())
117/// ```
118///
119/// Automatically sends a [`Chunk`] with the default [`RecordingProperties`] to
120/// the sink, unless an explicit `recording_id` is set via [`RecordingStreamBuilder::recording_id`].
121#[derive(Debug)]
122pub struct RecordingStreamBuilder {
123 application_id: ApplicationId,
124 store_kind: StoreKind,
125 store_id: Option<StoreId>,
126 store_source: Option<StoreSource>,
127
128 default_enabled: bool,
129 enabled: Option<bool>,
130
131 batcher_config: Option<ChunkBatcherConfig>,
132
133 // Optional user-defined recording properties.
134 should_send_properties: bool,
135 properties: RecordingProperties,
136}
137
138impl RecordingStreamBuilder {
139 /// Create a new [`RecordingStreamBuilder`] with the given [`ApplicationId`].
140 ///
141 /// The [`ApplicationId`] is usually the name of your app.
142 ///
143 /// ```no_run
144 /// # use re_sdk::RecordingStreamBuilder;
145 /// let rec = RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
146 /// # Ok::<(), Box<dyn std::error::Error>>(())
147 /// ```
148 //
149 // NOTE: track_caller so that we can see if we are being called from an official example.
150 #[track_caller]
151 pub fn new(application_id: impl Into<ApplicationId>) -> Self {
152 let application_id = application_id.into();
153
154 Self {
155 application_id,
156 store_kind: StoreKind::Recording,
157 store_id: None,
158 store_source: None,
159
160 default_enabled: true,
161 enabled: None,
162
163 batcher_config: None,
164
165 should_send_properties: true,
166 properties: RecordingProperties::new()
167 .with_start_time(re_types::components::Timestamp::now()),
168 }
169 }
170
171 /// Set whether or not Rerun is enabled by default.
172 ///
173 /// If the `RERUN` environment variable is set, it will override this.
174 ///
175 /// Set also: [`Self::enabled`].
176 #[inline]
177 pub fn default_enabled(mut self, default_enabled: bool) -> Self {
178 self.default_enabled = default_enabled;
179 self
180 }
181
182 /// Set whether or not Rerun is enabled.
183 ///
184 /// Setting this will ignore the `RERUN` environment variable.
185 ///
186 /// Set also: [`Self::default_enabled`].
187 #[inline]
188 pub fn enabled(mut self, enabled: bool) -> Self {
189 self.enabled = Some(enabled);
190 self
191 }
192
193 /// Set the `RecordingId` for this context.
194 ///
195 /// If you're logging from multiple processes and want all the messages to end up in the same
196 /// recording, you must make sure that they all set the same `RecordingId` using this function.
197 ///
198 /// Note that many stores can share the same [`ApplicationId`], but they all have
199 /// unique `RecordingId`s.
200 ///
201 /// The default is to use a random `RecordingId`.
202 ///
203 /// When explicitly setting a `RecordingId`, the initial chunk that contains the recording
204 /// properties will not be sent.
205 #[inline]
206 pub fn recording_id(mut self, recording_id: impl Into<String>) -> Self {
207 self.store_id = Some(StoreId::from_string(
208 StoreKind::Recording,
209 recording_id.into(),
210 ));
211 self.send_properties(false)
212 }
213
214 /// Sets an optional name for the recording.
215 #[inline]
216 pub fn recording_name(mut self, name: impl Into<String>) -> Self {
217 self.properties = self.properties.with_name(name.into());
218 self
219 }
220
221 /// Sets an optional name for the recording.
222 #[inline]
223 pub fn recording_started(mut self, started: impl Into<Timestamp>) -> Self {
224 self.properties = self.properties.with_start_time(started);
225 self
226 }
227
228 #[deprecated(since = "0.22.0", note = "use `send_properties` instead")]
229 /// Disables sending the [`RecordingProperties`] chunk.
230 #[inline]
231 pub fn disable_properties(mut self) -> Self {
232 self.should_send_properties = false;
233 self
234 }
235
236 /// Whether the [`RecordingProperties`] chunk should be sent.
237 #[inline]
238 pub fn send_properties(mut self, should_send: bool) -> Self {
239 self.should_send_properties = should_send;
240 self
241 }
242
243 /// Set the [`StoreId`] for this context.
244 ///
245 /// If you're logging from multiple processes and want all the messages to end up as the same
246 /// store, you must make sure they all set the same [`StoreId`] using this function.
247 ///
248 /// Note that many stores can share the same [`ApplicationId`], but they all have
249 /// unique [`StoreId`]s.
250 ///
251 /// The default is to use a random [`StoreId`].
252 #[inline]
253 pub fn store_id(mut self, store_id: StoreId) -> Self {
254 self.store_kind = store_id.kind;
255 self.store_id = Some(store_id);
256 self
257 }
258
259 /// Specifies the configuration of the internal data batching mechanism.
260 ///
261 /// See [`ChunkBatcher`] & [`ChunkBatcherConfig`] for more information.
262 #[inline]
263 pub fn batcher_config(mut self, config: ChunkBatcherConfig) -> Self {
264 self.batcher_config = Some(config);
265 self
266 }
267
268 #[doc(hidden)]
269 #[inline]
270 pub fn store_source(mut self, store_source: StoreSource) -> Self {
271 self.store_source = Some(store_source);
272 self
273 }
274
275 #[doc(hidden)]
276 #[inline]
277 pub fn blueprint(mut self) -> Self {
278 self.store_kind = StoreKind::Blueprint;
279 self
280 }
281
282 /// Creates a new [`RecordingStream`] that starts in a buffering state (RAM).
283 ///
284 /// ## Example
285 ///
286 /// ```
287 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").buffered()?;
288 /// # Ok::<(), Box<dyn std::error::Error>>(())
289 /// ```
290 pub fn buffered(self) -> RecordingStreamResult<RecordingStream> {
291 let (enabled, store_info, properties, batcher_config) = self.into_args();
292 if enabled {
293 RecordingStream::new(
294 store_info,
295 properties,
296 batcher_config,
297 Box::new(crate::log_sink::BufferedSink::new()),
298 )
299 } else {
300 re_log::debug!("Rerun disabled - call to buffered() ignored");
301 Ok(RecordingStream::disabled())
302 }
303 }
304
305 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
306 /// [`crate::log_sink::MemorySink`].
307 ///
308 /// ## Example
309 ///
310 /// ```
311 /// # fn log_data(_: &re_sdk::RecordingStream) { }
312 ///
313 /// let (rec, storage) = re_sdk::RecordingStreamBuilder::new("rerun_example_app").memory()?;
314 ///
315 /// log_data(&rec);
316 ///
317 /// let data = storage.take();
318 ///
319 /// # Ok::<(), Box<dyn std::error::Error>>(())
320 /// ```
321 pub fn memory(
322 self,
323 ) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> {
324 let (enabled, store_info, properties, batcher_config) = self.into_args();
325 let rec = if enabled {
326 RecordingStream::new(
327 store_info,
328 properties,
329 batcher_config,
330 Box::new(crate::log_sink::BufferedSink::new()),
331 )
332 } else {
333 re_log::debug!("Rerun disabled - call to memory() ignored");
334 Ok(RecordingStream::disabled())
335 }?;
336
337 let sink = crate::log_sink::MemorySink::new(rec.clone());
338 let storage = sink.buffer();
339 // Using set_sink here is necessary because the MemorySink needs to know
340 // it's own RecordingStream, which means we can't use `new` above.
341 // This has the downside of a bit of creation overhead and an extra StoreInfo
342 // message being sent to the sink.
343 // TODO(jleibs): Figure out a cleaner way to handle this.
344 rec.set_sink(Box::new(sink));
345 Ok((rec, storage))
346 }
347
348 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
349 /// remote Rerun instance.
350 ///
351 /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
352 ///
353 /// ## Example
354 ///
355 /// ```no_run
356 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").connect_grpc()?;
357 /// # Ok::<(), Box<dyn std::error::Error>>(())
358 /// ```
359 pub fn connect_grpc(self) -> RecordingStreamResult<RecordingStream> {
360 self.connect_grpc_opts(
361 format!(
362 "rerun+http://127.0.0.1:{}/proxy",
363 re_grpc_server::DEFAULT_SERVER_PORT
364 ),
365 crate::default_flush_timeout(),
366 )
367 }
368
369 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
370 /// remote Rerun instance.
371 ///
372 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
373 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
374 /// call to `flush` to block indefinitely if a connection cannot be established.
375 ///
376 /// ## Example
377 ///
378 /// ```no_run
379 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
380 /// .connect_grpc_opts("rerun+http://127.0.0.1:9876/proxy", re_sdk::default_flush_timeout())?;
381 /// # Ok::<(), Box<dyn std::error::Error>>(())
382 /// ```
383 pub fn connect_grpc_opts(
384 self,
385 url: impl Into<String>,
386 flush_timeout: Option<Duration>,
387 ) -> RecordingStreamResult<RecordingStream> {
388 let (enabled, store_info, properties, batcher_config) = self.into_args();
389 if enabled {
390 let url: String = url.into();
391 let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
392 return Err(RecordingStreamError::NotAProxyEndpoint);
393 };
394
395 RecordingStream::new(
396 store_info,
397 properties,
398 batcher_config,
399 Box::new(crate::log_sink::GrpcSink::new(uri, flush_timeout)),
400 )
401 } else {
402 re_log::debug!("Rerun disabled - call to connect() ignored");
403 Ok(RecordingStream::disabled())
404 }
405 }
406
407 #[cfg(feature = "server")]
408 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
409 /// locally hosted gRPC server.
410 ///
411 /// The server is hosted on the default IP and port, and may be connected to by any SDK or Viewer
412 /// at `rerun+http://127.0.0.1:9876/proxy` or by just running `rerun --connect`.
413 ///
414 /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
415 ///
416 /// The gRPC server will buffer in memory so that late connecting viewers will still get all the data.
417 /// You can limit the amount of data buffered by the gRPC server using [`Self::serve_grpc_opts`],
418 /// with the `server_memory_limit` argument. Once the memory limit is reached, the earliest logged data
419 /// will be dropped. Static data is never dropped.
420 pub fn serve_grpc(self) -> RecordingStreamResult<RecordingStream> {
421 self.serve_grpc_opts(
422 "0.0.0.0",
423 crate::DEFAULT_SERVER_PORT,
424 re_memory::MemoryLimit::from_fraction_of_total(0.75),
425 )
426 }
427
428 #[cfg(feature = "server")]
429 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
430 /// locally hosted gRPC server.
431 ///
432 /// The server is hosted on the given `bind_ip` and `port`, may be connected to by any SDK or Viewer
433 /// at `rerun+http://{bind_ip}:{port}/proxy`.
434 ///
435 /// `0.0.0.0` is a good default for `bind_ip`.
436 ///
437 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
438 /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
439 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
440 pub fn serve_grpc_opts(
441 self,
442 bind_ip: impl AsRef<str>,
443 port: u16,
444 server_memory_limit: re_memory::MemoryLimit,
445 ) -> RecordingStreamResult<RecordingStream> {
446 let (enabled, store_info, properties, batcher_config) = self.into_args();
447 if enabled {
448 RecordingStream::new(
449 store_info,
450 properties,
451 batcher_config,
452 Box::new(crate::grpc_server::GrpcServerSink::new(
453 bind_ip.as_ref(),
454 port,
455 server_memory_limit,
456 )?),
457 )
458 } else {
459 re_log::debug!("Rerun disabled - call to serve_grpc() ignored");
460 Ok(RecordingStream::disabled())
461 }
462 }
463
464 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an
465 /// RRD file on disk.
466 ///
467 /// The Rerun Viewer is able to read continuously from the resulting rrd file while it is being written.
468 /// However, depending on your OS and configuration, changes may not be immediately visible due to file caching.
469 /// This is a common issue on Windows and (to a lesser extent) on `MacOS`.
470 ///
471 /// ## Example
472 ///
473 /// ```no_run
474 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").save("my_recording.rrd")?;
475 /// # Ok::<(), Box<dyn std::error::Error>>(())
476 /// ```
477 #[cfg(not(target_arch = "wasm32"))]
478 pub fn save(
479 self,
480 path: impl Into<std::path::PathBuf>,
481 ) -> RecordingStreamResult<RecordingStream> {
482 let (enabled, store_info, properties, batcher_config) = self.into_args();
483
484 if enabled {
485 RecordingStream::new(
486 store_info,
487 properties,
488 batcher_config,
489 Box::new(crate::sink::FileSink::new(path)?),
490 )
491 } else {
492 re_log::debug!("Rerun disabled - call to save() ignored");
493 Ok(RecordingStream::disabled())
494 }
495 }
496
497 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to stdout.
498 ///
499 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
500 /// default back to `buffered` mode, in order not to break the user's terminal.
501 ///
502 /// ## Example
503 ///
504 /// ```no_run
505 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").stdout()?;
506 /// # Ok::<(), Box<dyn std::error::Error>>(())
507 /// ```
508 #[cfg(not(target_arch = "wasm32"))]
509 pub fn stdout(self) -> RecordingStreamResult<RecordingStream> {
510 if std::io::stdout().is_terminal() {
511 re_log::debug!("Ignored call to stdout() because stdout is a terminal");
512 return self.buffered();
513 }
514
515 let (enabled, store_info, properties, batcher_config) = self.into_args();
516
517 if enabled {
518 RecordingStream::new(
519 store_info,
520 properties,
521 batcher_config,
522 Box::new(crate::sink::FileSink::stdout()?),
523 )
524 } else {
525 re_log::debug!("Rerun disabled - call to stdout() ignored");
526 Ok(RecordingStream::disabled())
527 }
528 }
529
530 /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
531 /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
532 ///
533 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
534 /// that viewer instead of starting a new one.
535 ///
536 /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
537 /// as well as the underlying connection.
538 ///
539 /// ## Example
540 ///
541 /// ```no_run
542 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").spawn()?;
543 /// # Ok::<(), Box<dyn std::error::Error>>(())
544 /// ```
545 pub fn spawn(self) -> RecordingStreamResult<RecordingStream> {
546 self.spawn_opts(&Default::default(), crate::default_flush_timeout())
547 }
548
549 /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
550 /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over gRPC.
551 ///
552 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
553 /// that viewer instead of starting a new one.
554 ///
555 /// The behavior of the spawned Viewer can be configured via `opts`.
556 /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
557 ///
558 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
559 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
560 /// call to `flush` to block indefinitely if a connection cannot be established.
561 ///
562 /// ## Example
563 ///
564 /// ```no_run
565 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
566 /// .spawn_opts(&re_sdk::SpawnOptions::default(), re_sdk::default_flush_timeout())?;
567 /// # Ok::<(), Box<dyn std::error::Error>>(())
568 /// ```
569 pub fn spawn_opts(
570 self,
571 opts: &crate::SpawnOptions,
572 flush_timeout: Option<Duration>,
573 ) -> RecordingStreamResult<RecordingStream> {
574 if !self.is_enabled() {
575 re_log::debug!("Rerun disabled - call to spawn() ignored");
576 return Ok(RecordingStream::disabled());
577 }
578
579 let url = format!("rerun+http://{}/proxy", opts.connect_addr());
580
581 // NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
582 // what, thus spawning a viewer is pointless (and probably not intended).
583 if forced_sink_path().is_some() {
584 return self.connect_grpc_opts(url, flush_timeout);
585 }
586
587 crate::spawn(opts)?;
588
589 self.connect_grpc_opts(url, flush_timeout)
590 }
591
592 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
593 /// web-based Rerun viewer via gRPC.
594 ///
595 /// If the `open_browser` argument is `true`, your default browser will be opened with a
596 /// connected web-viewer.
597 ///
598 /// If not, you can connect to this server using the `rerun` binary (`cargo install rerun-cli --locked`).
599 ///
600 /// ## Details
601 /// This method will spawn two servers: one HTTPS server serving the Rerun Web Viewer `.html` and `.wasm` files,
602 /// and then one gRPC server that streams the log data to the web viewer (or to a native viewer, or to multiple viewers).
603 ///
604 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
605 /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
606 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
607 ///
608 /// ## Example
609 ///
610 /// ```ignore
611 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
612 /// .serve("0.0.0.0",
613 /// Default::default(),
614 /// Default::default(),
615 /// re_sdk::MemoryLimit::from_fraction_of_total(0.25),
616 /// true)?;
617 /// # Ok::<(), Box<dyn std::error::Error>>(())
618 /// ```
619 //
620 // # TODO(#5531): keep static data around.
621 #[cfg(feature = "web_viewer")]
622 #[deprecated(
623 since = "0.20.0",
624 note = "use rec.serve_grpc() with rerun::serve_web_viewer() instead"
625 )]
626 pub fn serve(
627 self,
628 bind_ip: &str,
629 web_port: WebViewerServerPort,
630 grpc_port: u16,
631 server_memory_limit: re_memory::MemoryLimit,
632 open_browser: bool,
633 ) -> RecordingStreamResult<RecordingStream> {
634 self.serve_web(
635 bind_ip,
636 web_port,
637 grpc_port,
638 server_memory_limit,
639 open_browser,
640 )
641 }
642
643 /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
644 /// web-based Rerun viewer via gRPC.
645 ///
646 /// If the `open_browser` argument is `true`, your default browser will be opened with a
647 /// connected web-viewer.
648 ///
649 /// If not, you can connect to this server using the `rerun` binary (`cargo install rerun-cli --locked`).
650 ///
651 /// ## Details
652 /// This method will spawn two servers: one HTTPS server serving the Rerun Web Viewer `.html` and `.wasm` files,
653 /// and then one gRPC server that streams the log data to the web viewer (or to a native viewer, or to multiple viewers).
654 ///
655 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
656 /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
657 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
658 ///
659 /// Calling `serve_web` is equivalent to calling [`Self::serve_grpc`] followed by [`crate::serve_web_viewer`].
660 ///
661 /// ## Example
662 ///
663 /// ```ignore
664 /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
665 /// .serve_web("0.0.0.0",
666 /// Default::default(),
667 /// Default::default(),
668 /// re_sdk::MemoryLimit::from_fraction_of_total(0.25),
669 /// true)?;
670 /// # Ok::<(), Box<dyn std::error::Error>>(())
671 /// ```
672 //
673 // # TODO(#5531): keep static data around.
674 #[cfg(feature = "web_viewer")]
675 pub fn serve_web(
676 self,
677 bind_ip: &str,
678 web_port: WebViewerServerPort,
679 grpc_port: u16,
680 server_memory_limit: re_memory::MemoryLimit,
681 open_browser: bool,
682 ) -> RecordingStreamResult<RecordingStream> {
683 let (enabled, store_info, properties, batcher_config) = self.into_args();
684 if enabled {
685 let sink = crate::web_viewer::new_sink(
686 open_browser,
687 bind_ip,
688 web_port,
689 grpc_port,
690 server_memory_limit,
691 )?;
692 RecordingStream::new(store_info, properties, batcher_config, sink)
693 } else {
694 re_log::debug!("Rerun disabled - call to serve() ignored");
695 Ok(RecordingStream::disabled())
696 }
697 }
698
699 /// Returns whether or not logging is enabled, a [`StoreInfo`] and the associated batcher
700 /// configuration.
701 ///
702 /// This can be used to then construct a [`RecordingStream`] manually using
703 /// [`RecordingStream::new`].
704 pub fn into_args(
705 self,
706 ) -> (
707 bool,
708 StoreInfo,
709 Option<RecordingProperties>,
710 ChunkBatcherConfig,
711 ) {
712 let enabled = self.is_enabled();
713
714 let Self {
715 application_id,
716 store_kind,
717 store_id,
718 store_source,
719 default_enabled: _,
720 enabled: _,
721 batcher_config,
722 should_send_properties,
723 properties,
724 } = self;
725
726 let store_id = store_id.unwrap_or(StoreId::random(store_kind));
727 let store_source = store_source.unwrap_or_else(|| StoreSource::RustSdk {
728 rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
729 llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
730 });
731
732 let store_info = StoreInfo {
733 application_id,
734 store_id,
735 cloned_from: None,
736 store_source,
737 started: None,
738 store_version: Some(re_build_info::CrateVersion::LOCAL),
739 };
740
741 let batcher_config =
742 batcher_config.unwrap_or_else(|| match ChunkBatcherConfig::from_env() {
743 Ok(config) => config,
744 Err(err) => {
745 re_log::error!("Failed to parse ChunkBatcherConfig from env: {}", err);
746 ChunkBatcherConfig::default()
747 }
748 });
749
750 (
751 enabled,
752 store_info,
753 should_send_properties.then_some(properties),
754 batcher_config,
755 )
756 }
757
758 /// Internal check for whether or not logging is enabled using explicit/default settings & env var.
759 fn is_enabled(&self) -> bool {
760 self.enabled
761 .unwrap_or_else(|| crate::decide_logging_enabled(self.default_enabled))
762 }
763}
764
765// ----------------------------------------------------------------------------
766
767/// A [`RecordingStream`] handles everything related to logging data into Rerun.
768///
769/// You can construct a new [`RecordingStream`] using [`RecordingStreamBuilder`] or
770/// [`RecordingStream::new`].
771///
772/// ## Sinks
773///
774/// Data is logged into Rerun via [`LogSink`]s.
775///
776/// The underlying [`LogSink`] of a [`RecordingStream`] can be changed at any point during its
777/// lifetime by calling [`RecordingStream::set_sink`] or one of the higher level helpers
778/// ([`RecordingStream::connect_grpc`], [`RecordingStream::memory`],
779/// [`RecordingStream::save`], [`RecordingStream::disconnect`]).
780///
781/// See [`RecordingStream::set_sink`] for more information.
782///
783/// ## Multithreading and ordering
784///
785/// [`RecordingStream`] can be cheaply cloned and used freely across any number of threads.
786///
787/// Internally, all operations are linearized into a pipeline:
788/// - All operations sent by a given thread will take effect in the same exact order as that
789/// thread originally sent them in, from its point of view.
790/// - There isn't any well defined global order across multiple threads.
791///
792/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all previous data sent by the calling thread
793/// has been recorded and (if applicable) flushed to the underlying OS-managed file descriptor,
794/// but other threads may still have data in flight.
795///
796/// ## Shutdown
797///
798/// The [`RecordingStream`] can only be shutdown by dropping all instances of it, at which point
799/// it will automatically take care of flushing any pending data that might remain in the pipeline.
800///
801/// Shutting down cannot ever block.
802#[derive(Clone)]
803pub struct RecordingStream {
804 inner: Either<Arc<Option<RecordingStreamInner>>, Weak<Option<RecordingStreamInner>>>,
805}
806
807impl RecordingStream {
808 /// Passes a reference to the [`RecordingStreamInner`], if it exists.
809 ///
810 /// This works whether the underlying stream is strong or weak.
811 #[inline]
812 fn with<F: FnOnce(&RecordingStreamInner) -> R, R>(&self, f: F) -> Option<R> {
813 use std::ops::Deref as _;
814 match &self.inner {
815 Either::Left(strong) => strong.deref().as_ref().map(f),
816 Either::Right(weak) => weak
817 .upgrade()
818 .and_then(|strong| strong.deref().as_ref().map(f)),
819 }
820 }
821
822 /// Clones the [`RecordingStream`] without incrementing the refcount.
823 ///
824 /// Useful e.g. if you want to make sure that a detached thread won't prevent the [`RecordingStream`]
825 /// from flushing during shutdown.
826 //
827 // TODO(#5335): shutdown flushing behavior is too brittle.
828 #[inline]
829 pub fn clone_weak(&self) -> Self {
830 Self {
831 inner: match &self.inner {
832 Either::Left(strong) => Either::Right(Arc::downgrade(strong)),
833 Either::Right(weak) => Either::Right(Weak::clone(weak)),
834 },
835 }
836 }
837}
838
839// TODO(#5335): shutdown flushing behavior is too brittle.
840impl Drop for RecordingStream {
841 #[inline]
842 fn drop(&mut self) {
843 // If this holds the last strong handle to the recording, make sure that all pending
844 // `DataLoader` threads that were started from the SDK actually run to completion (they
845 // all hold a weak handle to this very recording!).
846 //
847 // NOTE: It's very important to do so from the `Drop` implementation of `RecordingStream`
848 // itself, because the dataloader threads -- by definition -- will have to send data into
849 // this very recording, therefore we must make sure that at least one strong handle still lives
850 // on until they are all finished.
851 if let Either::Left(strong) = &mut self.inner {
852 if Arc::strong_count(strong) == 1 {
853 // Keep the recording alive until all dataloaders are finished.
854 self.with(|inner| inner.wait_for_dataloaders());
855 }
856 }
857 }
858}
859
860struct RecordingStreamInner {
861 info: StoreInfo,
862 properties: Option<RecordingProperties>,
863 tick: AtomicI64,
864
865 /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
866 /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
867 /// running.
868 cmds_tx: Sender<Command>,
869
870 batcher: ChunkBatcher,
871 batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
872
873 /// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader`
874 /// machinery in the context of this `RecordingStream`.
875 ///
876 /// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
877 dataloader_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,
878
879 pid_at_creation: u32,
880}
881
882impl fmt::Debug for RecordingStreamInner {
883 #[inline]
884 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
885 f.debug_struct("RecordingStreamInner")
886 .field("info", &self.info.store_id)
887 .finish()
888 }
889}
890
891impl Drop for RecordingStreamInner {
892 fn drop(&mut self) {
893 if self.is_forked_child() {
894 re_log::error_once!(
895 "Fork detected while dropping RecordingStreamInner. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
896 );
897 return;
898 }
899
900 self.wait_for_dataloaders();
901
902 // NOTE: The command channel is private, if we're here, nothing is currently capable of
903 // sending data down the pipeline.
904 self.batcher.flush_blocking();
905 self.cmds_tx.send(Command::PopPendingChunks).ok();
906 self.cmds_tx.send(Command::Shutdown).ok();
907 if let Some(handle) = self.batcher_to_sink_handle.take() {
908 handle.join().ok();
909 }
910 }
911}
912
913impl RecordingStreamInner {
914 fn new(
915 info: StoreInfo,
916 properties: Option<RecordingProperties>,
917 batcher_config: ChunkBatcherConfig,
918 sink: Box<dyn LogSink>,
919 ) -> RecordingStreamResult<Self> {
920 let on_release = batcher_config.hooks.on_release.clone();
921 let batcher = ChunkBatcher::new(batcher_config)?;
922
923 {
924 re_log::debug!(
925 app_id = %info.application_id,
926 rec_id = %info.store_id,
927 "setting recording info",
928 );
929 sink.send(
930 re_log_types::SetStoreInfo {
931 row_id: *RowId::new(),
932 info: info.clone(),
933 }
934 .into(),
935 );
936 }
937
938 let (cmds_tx, cmds_rx) = crossbeam::channel::unbounded();
939
940 let batcher_to_sink_handle = {
941 const NAME: &str = "RecordingStream::batcher_to_sink";
942 std::thread::Builder::new()
943 .name(NAME.into())
944 .spawn({
945 let info = info.clone();
946 let batcher = batcher.clone();
947 move || forwarding_thread(info, sink, cmds_rx, batcher.chunks(), on_release)
948 })
949 .map_err(|err| RecordingStreamError::SpawnThread {
950 name: NAME.into(),
951 err,
952 })?
953 };
954
955 if let Some(properties) = properties.as_ref() {
956 // We pre-populate the batcher with a chunk the contains the recording
957 // properties, so that these get automatically sent to the sink.
958
959 re_log::debug!(properties = ?properties, "adding recording properties to batcher");
960
961 let properties_chunk = Chunk::builder(EntityPath::recording_properties())
962 .with_archetype(RowId::new(), TimePoint::default(), properties)
963 .build()?;
964
965 batcher.push_chunk(properties_chunk);
966 }
967
968 Ok(Self {
969 info,
970 properties,
971 tick: AtomicI64::new(0),
972 cmds_tx,
973 batcher,
974 batcher_to_sink_handle: Some(batcher_to_sink_handle),
975 dataloader_handles: Mutex::new(Vec::new()),
976 pid_at_creation: std::process::id(),
977 })
978 }
979
980 #[inline]
981 pub fn is_forked_child(&self) -> bool {
982 self.pid_at_creation != std::process::id()
983 }
984
985 /// Make sure all pending top-level `DataLoader` threads that were started from the SDK run to completion.
986 //
987 // TODO(cmc): At some point we might want to make it configurable, though I cannot really
988 // think of a use case where you'd want to drop those threads immediately upon
989 // disconnection.
990 fn wait_for_dataloaders(&self) {
991 let dataloader_handles = std::mem::take(&mut *self.dataloader_handles.lock());
992 for handle in dataloader_handles {
993 handle.join().ok();
994 }
995 }
996}
997
998enum Command {
999 RecordMsg(LogMsg),
1000 SwapSink(Box<dyn LogSink>),
1001 Flush(Sender<()>),
1002 PopPendingChunks,
1003 Shutdown,
1004}
1005
1006impl Command {
1007 fn flush() -> (Self, Receiver<()>) {
1008 let (tx, rx) = crossbeam::channel::bounded(0); // oneshot
1009 (Self::Flush(tx), rx)
1010 }
1011}
1012
1013impl RecordingStream {
1014 /// Creates a new [`RecordingStream`] with a given [`StoreInfo`] and [`LogSink`].
1015 ///
1016 /// You can create a [`StoreInfo`] with [`crate::new_store_info`];
1017 ///
1018 /// The [`StoreInfo`] is immediately sent to the sink in the form of a
1019 /// [`re_log_types::SetStoreInfo`].
1020 ///
1021 /// You can find sinks in [`crate::sink`].
1022 ///
1023 /// See also: [`RecordingStreamBuilder`].
1024 #[must_use = "Recording will get closed automatically once all instances of this object have been dropped"]
1025 pub fn new(
1026 info: StoreInfo,
1027 properties: Option<RecordingProperties>,
1028 batcher_config: ChunkBatcherConfig,
1029 sink: Box<dyn LogSink>,
1030 ) -> RecordingStreamResult<Self> {
1031 let sink = (info.store_id.kind == StoreKind::Recording)
1032 .then(forced_sink_path)
1033 .flatten()
1034 .map_or(sink, |path| {
1035 re_log::info!("Forcing FileSink because of env-var {ENV_FORCE_SAVE}={path:?}");
1036 // `unwrap` is ok since this force sinks are only used in tests.
1037 Box::new(crate::sink::FileSink::new(path).unwrap()) as Box<dyn LogSink>
1038 });
1039
1040 let stream =
1041 RecordingStreamInner::new(info, properties, batcher_config, sink).map(|inner| {
1042 Self {
1043 inner: Either::Left(Arc::new(Some(inner))),
1044 }
1045 })?;
1046
1047 Ok(stream)
1048 }
1049
1050 /// Creates a new no-op [`RecordingStream`] that drops all logging messages, doesn't allocate
1051 /// any memory and doesn't spawn any threads.
1052 ///
1053 /// [`Self::is_enabled`] will return `false`.
1054 pub fn disabled() -> Self {
1055 Self {
1056 inner: Either::Left(Arc::new(None)),
1057 }
1058 }
1059}
1060
1061impl RecordingStream {
1062 /// Log data to Rerun.
1063 ///
1064 /// This is the main entry point for logging data to rerun. It can be used to log anything
1065 /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1066 /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1067 ///
1068 /// The data will be timestamped automatically based on the [`RecordingStream`]'s internal clock.
1069 /// See [`RecordingStream::set_time_sequence`] etc for more information.
1070 ///
1071 /// The entity path can either be a string
1072 /// (with special characters escaped, split on unescaped slashes)
1073 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1074 /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1075 ///
1076 /// See also: [`Self::log_static`] for logging static data.
1077 ///
1078 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1079 /// transport.
1080 /// See [SDK Micro Batching] for more information.
1081 ///
1082 /// # Example:
1083 /// ```ignore
1084 /// # use rerun;
1085 /// # let (rec, storage) = rerun::RecordingStreamBuilder::new("rerun_example_points3d_simple").memory()?;
1086 /// rec.log(
1087 /// "my/points",
1088 /// &rerun::Points3D::new([(0.0, 0.0, 0.0), (1.0, 1.0, 1.0)]),
1089 /// )?;
1090 /// # Ok::<(), Box<dyn std::error::Error>>(())
1091 /// ```
1092 ///
1093 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1094 /// [component bundle]: [`AsComponents`]
1095 #[inline]
1096 pub fn log<AS: ?Sized + AsComponents>(
1097 &self,
1098 ent_path: impl Into<EntityPath>,
1099 as_components: &AS,
1100 ) -> RecordingStreamResult<()> {
1101 self.log_with_static(ent_path, false, as_components)
1102 }
1103
1104 /// Lower-level logging API to provide data spanning multiple timepoints.
1105 ///
1106 /// Unlike the regular `log` API, which is row-oriented, this API lets you submit the data
1107 /// in a columnar form. The lengths of all of the [`TimeColumn`] and the component columns
1108 /// must match. All data that occurs at the same index across the different index/time and components
1109 /// arrays will act as a single logical row.
1110 ///
1111 /// Note that this API ignores any stateful index/time set on the log stream via the
1112 /// [`Self::set_time`]/[`Self::set_timepoint`]/[`Self::set_time_nanos`]/etc. APIs.
1113 /// Furthermore, this will _not_ inject the default timelines `log_tick` and `log_time` timeline columns.
1114 pub fn send_columns(
1115 &self,
1116 ent_path: impl Into<EntityPath>,
1117 indexes: impl IntoIterator<Item = TimeColumn>,
1118 columns: impl IntoIterator<Item = SerializedComponentColumn>,
1119 ) -> RecordingStreamResult<()> {
1120 let id = ChunkId::new();
1121
1122 let indexes = indexes
1123 .into_iter()
1124 .map(|col| (*col.timeline().name(), col))
1125 .collect();
1126
1127 let components: ChunkComponents = columns
1128 .into_iter()
1129 .map(|column| (column.descriptor, column.list_array))
1130 .collect();
1131
1132 let chunk = Chunk::from_auto_row_ids(id, ent_path.into(), indexes, components)?;
1133
1134 self.send_chunk(chunk);
1135
1136 Ok(())
1137 }
1138
1139 /// Log data to Rerun.
1140 ///
1141 /// It can be used to log anything
1142 /// that implements the [`AsComponents`], such as any [archetype](https://docs.rs/rerun/latest/rerun/archetypes/index.html)
1143 /// or individual [component](https://docs.rs/rerun/latest/rerun/components/index.html).
1144 ///
1145 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1146 /// any temporal data of the same type.
1147 /// All timestamp data associated with this message will be dropped right before sending it to Rerun.
1148 ///
1149 /// This is most often used for [`rerun::ViewCoordinates`](https://docs.rs/rerun/latest/rerun/archetypes/struct.ViewCoordinates.html) and
1150 /// [`rerun::AnnotationContext`](https://docs.rs/rerun/latest/rerun/archetypes/struct.AnnotationContext.html).
1151 ///
1152 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1153 /// transport.
1154 /// See [SDK Micro Batching] for more information.
1155 ///
1156 /// See also [`Self::log`].
1157 ///
1158 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1159 /// [component bundle]: [`AsComponents`]
1160 #[inline]
1161 pub fn log_static<AS: ?Sized + AsComponents>(
1162 &self,
1163 ent_path: impl Into<EntityPath>,
1164 as_components: &AS,
1165 ) -> RecordingStreamResult<()> {
1166 self.log_with_static(ent_path, true, as_components)
1167 }
1168
1169 /// Logs the contents of a [component bundle] into Rerun.
1170 ///
1171 /// If `static_` is set to `true`, all timestamp data associated with this message will be
1172 /// dropped right before sending it to Rerun.
1173 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1174 /// any temporal data of the same type.
1175 ///
1176 /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1177 /// internal clock.
1178 /// See `RecordingStream::set_time_*` family of methods for more information.
1179 ///
1180 /// The entity path can either be a string
1181 /// (with special characters escaped, split on unescaped slashes)
1182 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1183 /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1184 ///
1185 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1186 /// transport.
1187 /// See [SDK Micro Batching] for more information.
1188 ///
1189 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1190 /// [component bundle]: [`AsComponents`]
1191 #[inline]
1192 pub fn log_with_static<AS: ?Sized + AsComponents>(
1193 &self,
1194 ent_path: impl Into<EntityPath>,
1195 static_: bool,
1196 as_components: &AS,
1197 ) -> RecordingStreamResult<()> {
1198 let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1199 self.log_serialized_batches_impl(
1200 row_id,
1201 ent_path,
1202 static_,
1203 as_components.as_serialized_batches(),
1204 )
1205 }
1206
1207 /// Logs a set of [`SerializedComponentBatch`]es into Rerun.
1208 ///
1209 /// If `static_` is set to `true`, all timestamp data associated with this message will be
1210 /// dropped right before sending it to Rerun.
1211 /// Static data has no time associated with it, exists on all timelines, and unconditionally shadows
1212 /// any temporal data of the same type.
1213 ///
1214 /// Otherwise, the data will be timestamped automatically based on the [`RecordingStream`]'s
1215 /// internal clock.
1216 /// See `RecordingStream::set_time_*` family of methods for more information.
1217 ///
1218 /// The number of instances will be determined by the longest batch in the bundle.
1219 ///
1220 /// The entity path can either be a string
1221 /// (with special characters escaped, split on unescaped slashes)
1222 /// or an [`EntityPath`] constructed with [`crate::entity_path`].
1223 /// See <https://www.rerun.io/docs/concepts/entity-path> for more on entity paths.
1224 ///
1225 /// Internally, the stream will automatically micro-batch multiple log calls to optimize
1226 /// transport.
1227 /// See [SDK Micro Batching] for more information.
1228 ///
1229 /// [SDK Micro Batching]: https://www.rerun.io/docs/reference/sdk/micro-batching
1230 ///
1231 /// [`SerializedComponentBatch`]: [re_types_core::SerializedComponentBatch]
1232 pub fn log_serialized_batches(
1233 &self,
1234 ent_path: impl Into<EntityPath>,
1235 static_: bool,
1236 comp_batches: impl IntoIterator<Item = re_types::SerializedComponentBatch>,
1237 ) -> RecordingStreamResult<()> {
1238 let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1239 self.log_serialized_batches_impl(row_id, ent_path, static_, comp_batches)
1240 }
1241
1242 /// Sends a property to the recording.
1243 #[inline]
1244 pub fn send_property<AS: ?Sized + AsComponents>(
1245 &self,
1246 name: impl Into<String>,
1247 values: &AS,
1248 ) -> RecordingStreamResult<()> {
1249 let sub_path = EntityPath::from(name.into());
1250 self.log_static(EntityPath::properties().join(&sub_path), values)
1251 }
1252
1253 /// Sends the name of the recording.
1254 #[inline]
1255 pub fn send_recording_name(&self, name: impl Into<String>) -> RecordingStreamResult<()> {
1256 let update = RecordingProperties::update_fields().with_name(name.into());
1257 self.log_static(EntityPath::recording_properties(), &update)
1258 }
1259
1260 /// Sends the start time of the recording.
1261 #[inline]
1262 pub fn send_recording_start_time(
1263 &self,
1264 timestamp: impl Into<Timestamp>,
1265 ) -> RecordingStreamResult<()> {
1266 let update = RecordingProperties::update_fields().with_start_time(timestamp.into());
1267 self.log_static(EntityPath::recording_properties(), &update)
1268 }
1269
1270 // NOTE: For bw and fw compatibility reasons, we need our logging APIs to be fallible, even
1271 // though they really aren't at the moment.
1272 #[allow(clippy::unnecessary_wraps)]
1273 fn log_serialized_batches_impl(
1274 &self,
1275 row_id: RowId,
1276 entity_path: impl Into<EntityPath>,
1277 static_: bool,
1278 comp_batches: impl IntoIterator<Item = re_types::SerializedComponentBatch>,
1279 ) -> RecordingStreamResult<()> {
1280 if !self.is_enabled() {
1281 return Ok(()); // silently drop the message
1282 }
1283
1284 let entity_path = entity_path.into();
1285
1286 let comp_batches: Vec<_> = comp_batches
1287 .into_iter()
1288 .map(|comp_batch| (comp_batch.descriptor, comp_batch.array))
1289 .collect();
1290 let components: IntMap<_, _> = comp_batches.into_iter().collect();
1291
1292 // NOTE: The timepoint is irrelevant, the `RecordingStream` will overwrite it using its
1293 // internal clock.
1294 let timepoint = TimePoint::default();
1295
1296 if !components.is_empty() {
1297 let row = PendingRow {
1298 row_id,
1299 timepoint,
1300 components,
1301 };
1302 self.record_row(entity_path, row, !static_);
1303 }
1304
1305 Ok(())
1306 }
1307
1308 /// Logs the file at the given `path` using all [`re_data_loader::DataLoader`]s available.
1309 ///
1310 /// A single `path` might be handled by more than one loader.
1311 ///
1312 /// This method blocks until either at least one [`re_data_loader::DataLoader`] starts
1313 /// streaming data in or all of them fail.
1314 ///
1315 /// See <https://www.rerun.io/docs/reference/data-loaders/overview> for more information.
1316 #[cfg(feature = "data_loaders")]
1317 pub fn log_file_from_path(
1318 &self,
1319 filepath: impl AsRef<std::path::Path>,
1320 entity_path_prefix: Option<EntityPath>,
1321 static_: bool,
1322 ) -> RecordingStreamResult<()> {
1323 self.log_file(filepath, None, entity_path_prefix, static_, true)
1324 }
1325
1326 /// Logs the given `contents` using all [`re_data_loader::DataLoader`]s available.
1327 ///
1328 /// A single `path` might be handled by more than one loader.
1329 ///
1330 /// This method blocks until either at least one [`re_data_loader::DataLoader`] starts
1331 /// streaming data in or all of them fail.
1332 ///
1333 /// See <https://www.rerun.io/docs/reference/data-loaders/overview> for more information.
1334 #[cfg(feature = "data_loaders")]
1335 pub fn log_file_from_contents(
1336 &self,
1337 filepath: impl AsRef<std::path::Path>,
1338 contents: std::borrow::Cow<'_, [u8]>,
1339 entity_path_prefix: Option<EntityPath>,
1340 static_: bool,
1341 ) -> RecordingStreamResult<()> {
1342 self.log_file(filepath, Some(contents), entity_path_prefix, static_, true)
1343 }
1344
1345 /// If `prefer_current_recording` is set (which is always the case for now), the dataloader settings
1346 /// will be configured as if the current SDK recording is the currently opened recording.
1347 /// Most dataloaders prefer logging to the currently opened recording if one is set.
1348 #[cfg(feature = "data_loaders")]
1349 fn log_file(
1350 &self,
1351 filepath: impl AsRef<std::path::Path>,
1352 contents: Option<std::borrow::Cow<'_, [u8]>>,
1353 entity_path_prefix: Option<EntityPath>,
1354 static_: bool,
1355 prefer_current_recording: bool,
1356 ) -> RecordingStreamResult<()> {
1357 let Some(store_info) = self.store_info().clone() else {
1358 re_log::warn!(
1359 "Ignored call to log_file() because RecordingStream has not been properly initialized"
1360 );
1361 return Ok(());
1362 };
1363
1364 let filepath = filepath.as_ref();
1365 let has_contents = contents.is_some();
1366
1367 let (tx, rx) = re_smart_channel::smart_channel(
1368 re_smart_channel::SmartMessageSource::Sdk,
1369 re_smart_channel::SmartChannelSource::File(filepath.into()),
1370 );
1371
1372 let mut settings = crate::DataLoaderSettings {
1373 application_id: Some(store_info.application_id.clone()),
1374 opened_application_id: None,
1375 store_id: store_info.store_id.clone(),
1376 opened_store_id: None,
1377 force_store_info: false,
1378 entity_path_prefix,
1379 timepoint: (!static_).then(|| {
1380 self.with(|inner| {
1381 // Get the current time on all timelines, for the current recording, on the current
1382 // thread…
1383 let mut now = self.now();
1384
1385 // …and then also inject the current recording tick into it.
1386 let tick = inner
1387 .tick
1388 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1389 now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1390
1391 now
1392 })
1393 .unwrap_or_default()
1394 }),
1395 };
1396
1397 if prefer_current_recording {
1398 settings.opened_application_id = Some(store_info.application_id.clone());
1399 settings.opened_store_id = Some(store_info.store_id);
1400 }
1401
1402 if let Some(contents) = contents {
1403 re_data_loader::load_from_file_contents(
1404 &settings,
1405 re_log_types::FileSource::Sdk,
1406 filepath,
1407 contents,
1408 &tx,
1409 )?;
1410 } else {
1411 re_data_loader::load_from_path(
1412 &settings,
1413 re_log_types::FileSource::Sdk,
1414 filepath,
1415 &tx,
1416 )?;
1417 }
1418 drop(tx);
1419
1420 // We can safely ignore the error on `recv()` as we're in complete control of both ends of
1421 // the channel.
1422 let thread_name = if has_contents {
1423 format!("log_file_from_contents({filepath:?})")
1424 } else {
1425 format!("log_file_from_path({filepath:?})")
1426 };
1427 let handle = std::thread::Builder::new()
1428 .name(thread_name.clone())
1429 .spawn({
1430 let this = self.clone_weak();
1431 move || {
1432 while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) {
1433 this.record_msg(msg);
1434 }
1435 }
1436 })
1437 .map_err(|err| RecordingStreamError::SpawnThread {
1438 name: thread_name,
1439 err,
1440 })?;
1441
1442 self.with(|inner| inner.dataloader_handles.lock().push(handle));
1443
1444 Ok(())
1445 }
1446}
1447
1448#[allow(clippy::needless_pass_by_value)]
1449fn forwarding_thread(
1450 info: StoreInfo,
1451 mut sink: Box<dyn LogSink>,
1452 cmds_rx: Receiver<Command>,
1453 chunks: Receiver<Chunk>,
1454 on_release: Option<ArrowRecordBatchReleaseCallback>,
1455) {
1456 /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate
1457 /// shutdown.
1458 fn handle_cmd(info: &StoreInfo, cmd: Command, sink: &mut Box<dyn LogSink>) -> bool {
1459 match cmd {
1460 Command::RecordMsg(msg) => {
1461 sink.send(msg);
1462 }
1463 Command::SwapSink(new_sink) => {
1464 re_log::trace!("Swapping sink…");
1465 let backlog = {
1466 // Capture the backlog if it exists.
1467 let backlog = sink.drain_backlog();
1468
1469 // Flush the underlying sink if possible.
1470 sink.drop_if_disconnected();
1471 sink.flush_blocking();
1472
1473 backlog
1474 };
1475
1476 // Send the recording info to the new sink. This is idempotent.
1477 {
1478 re_log::debug!(
1479 app_id = %info.application_id,
1480 rec_id = %info.store_id,
1481 "setting recording info",
1482 );
1483 new_sink.send(
1484 re_log_types::SetStoreInfo {
1485 row_id: *RowId::new(),
1486 info: info.clone(),
1487 }
1488 .into(),
1489 );
1490 new_sink.send_all(backlog);
1491 }
1492
1493 *sink = new_sink;
1494 }
1495 Command::Flush(oneshot) => {
1496 re_log::trace!("Flushing…");
1497 // Flush the underlying sink if possible.
1498 sink.drop_if_disconnected();
1499 sink.flush_blocking();
1500 drop(oneshot); // signals the oneshot
1501 }
1502 Command::PopPendingChunks => {
1503 // Wake up and skip the current iteration so that we can drain all pending chunks
1504 // before handling the next command.
1505 }
1506 Command::Shutdown => return false,
1507 }
1508
1509 true
1510 }
1511
1512 use crossbeam::select;
1513 loop {
1514 // NOTE: Always pop chunks first, this is what makes `Command::PopPendingChunks` possible,
1515 // which in turns makes `RecordingStream::flush_blocking` well defined.
1516 while let Ok(chunk) = chunks.try_recv() {
1517 let mut msg = match chunk.to_arrow_msg() {
1518 Ok(chunk) => chunk,
1519 Err(err) => {
1520 re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1521 continue;
1522 }
1523 };
1524 msg.on_release = on_release.clone();
1525 sink.send(LogMsg::ArrowMsg(info.store_id.clone(), msg));
1526 }
1527
1528 select! {
1529 recv(chunks) -> res => {
1530 let Ok(chunk) = res else {
1531 // The batcher is gone, which can only happen if the `RecordingStream` itself
1532 // has been dropped.
1533 re_log::trace!("Shutting down forwarding_thread: batcher is gone");
1534 break;
1535 };
1536
1537 let msg = match chunk.to_arrow_msg() {
1538 Ok(chunk) => chunk,
1539 Err(err) => {
1540 re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)");
1541 continue;
1542 }
1543 };
1544
1545 sink.send(LogMsg::ArrowMsg(info.store_id.clone(), msg));
1546 }
1547
1548 recv(cmds_rx) -> res => {
1549 let Ok(cmd) = res else {
1550 // All command senders are gone, which can only happen if the
1551 // `RecordingStream` itself has been dropped.
1552 re_log::trace!("Shutting down forwarding_thread: all command senders are gone");
1553 break;
1554 };
1555 if !handle_cmd(&info, cmd, &mut sink) {
1556 break; // shutdown
1557 }
1558 }
1559 }
1560
1561 // NOTE: The receiving end of the command stream is owned solely by this thread.
1562 // Past this point, all command writes will return `ErrDisconnected`.
1563 }
1564}
1565
1566impl RecordingStream {
1567 /// Check if logging is enabled on this `RecordingStream`.
1568 ///
1569 /// If not, all recording calls will be ignored.
1570 #[inline]
1571 pub fn is_enabled(&self) -> bool {
1572 self.with(|_| true).unwrap_or(false)
1573 }
1574
1575 /// The [`StoreInfo`] associated with this `RecordingStream`.
1576 #[inline]
1577 pub fn store_info(&self) -> Option<StoreInfo> {
1578 self.with(|inner| inner.info.clone())
1579 }
1580
1581 /// Determine whether a fork has happened since creating this `RecordingStream`. In general, this means our
1582 /// batcher/sink threads are gone and all data logged since the fork has been dropped.
1583 ///
1584 /// It is essential that [`crate::cleanup_if_forked_child`] be called after forking the process. SDK-implementations
1585 /// should do this during their initialization phase.
1586 #[inline]
1587 pub fn is_forked_child(&self) -> bool {
1588 self.with(|inner| inner.is_forked_child()).unwrap_or(false)
1589 }
1590}
1591
1592impl RecordingStream {
1593 /// Records an arbitrary [`LogMsg`].
1594 #[inline]
1595 pub fn record_msg(&self, msg: LogMsg) {
1596 let f = move |inner: &RecordingStreamInner| {
1597 // NOTE: Internal channels can never be closed outside of the `Drop` impl, this send cannot
1598 // fail.
1599 inner.cmds_tx.send(Command::RecordMsg(msg)).ok();
1600 inner
1601 .tick
1602 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1603 };
1604
1605 if self.with(f).is_none() {
1606 re_log::warn_once!("Recording disabled - call to record_msg() ignored");
1607 }
1608 }
1609
1610 /// Records a single [`PendingRow`].
1611 ///
1612 /// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
1613 /// [`RecordingStream`]'s internal clock.
1614 ///
1615 /// Internally, incoming [`PendingRow`]s are automatically coalesced into larger [`Chunk`]s to
1616 /// optimize for transport.
1617 #[inline]
1618 pub fn record_row(&self, entity_path: EntityPath, mut row: PendingRow, inject_time: bool) {
1619 let f = move |inner: &RecordingStreamInner| {
1620 // NOTE: We're incrementing the current tick still.
1621 let tick = inner
1622 .tick
1623 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1624 if inject_time {
1625 // Get the current time on all timelines, for the current recording, on the current
1626 // thread…
1627 let mut now = self.now();
1628 // …and then also inject the current recording tick into it.
1629 now.insert_cell(TimelineName::log_tick(), TimeCell::from_sequence(tick));
1630
1631 // Inject all these times into the row, overriding conflicting times, if any.
1632 for (timeline, cell) in now {
1633 row.timepoint.insert_cell(timeline, cell);
1634 }
1635 }
1636
1637 inner.batcher.push_row(entity_path, row);
1638 };
1639
1640 if self.with(f).is_none() {
1641 re_log::warn_once!("Recording disabled - call to record_row() ignored");
1642 }
1643 }
1644
1645 /// Logs a single [`Chunk`].
1646 ///
1647 /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1648 /// If you don't want to inject these, use [`Self::send_chunk`] instead.
1649 #[inline]
1650 pub fn log_chunk(&self, mut chunk: Chunk) {
1651 let f = move |inner: &RecordingStreamInner| {
1652 // TODO(cmc): Repeating these values is pretty wasteful. Would be nice to have a way of
1653 // indicating these are fixed across the whole chunk.
1654 // Inject the log time
1655 {
1656 let time_timeline = Timeline::log_time();
1657 let time =
1658 TimeInt::new_temporal(re_log_types::Timestamp::now().nanos_since_epoch());
1659
1660 let repeated_time = std::iter::repeat(time.as_i64())
1661 .take(chunk.num_rows())
1662 .collect();
1663
1664 let time_column = TimeColumn::new(Some(true), time_timeline, repeated_time);
1665
1666 if let Err(err) = chunk.add_timeline(time_column) {
1667 re_log::error!(
1668 "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1669 time_timeline.name(),
1670 err
1671 );
1672 return;
1673 }
1674 }
1675 // Inject the log tick
1676 {
1677 let tick_timeline = Timeline::log_tick();
1678
1679 let tick = inner
1680 .tick
1681 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1682
1683 let repeated_tick = std::iter::repeat(tick).take(chunk.num_rows()).collect();
1684
1685 let tick_chunk = TimeColumn::new(Some(true), tick_timeline, repeated_tick);
1686
1687 if let Err(err) = chunk.add_timeline(tick_chunk) {
1688 re_log::error!(
1689 "Couldn't inject '{}' timeline into chunk (this is a bug in Rerun!): {}",
1690 tick_timeline.name(),
1691 err
1692 );
1693 return;
1694 }
1695 }
1696
1697 inner.batcher.push_chunk(chunk);
1698 };
1699
1700 if self.with(f).is_none() {
1701 re_log::warn_once!("Recording disabled - call to log_chunk() ignored");
1702 }
1703 }
1704
1705 /// Logs multiple [`Chunk`]s.
1706 ///
1707 /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1708 /// for that use [`Self::log_chunks`].
1709 pub fn log_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1710 for chunk in chunks {
1711 self.log_chunk(chunk);
1712 }
1713 }
1714
1715 /// Records a single [`Chunk`].
1716 ///
1717 /// Will inject `log_tick` and `log_time` timeline columns into the chunk.
1718 /// If you don't want to inject these, use [`Self::send_chunks`] instead.
1719 #[inline]
1720 pub fn send_chunk(&self, chunk: Chunk) {
1721 let f = move |inner: &RecordingStreamInner| {
1722 inner.batcher.push_chunk(chunk);
1723 };
1724
1725 if self.with(f).is_none() {
1726 re_log::warn_once!("Recording disabled - call to send_chunk() ignored");
1727 }
1728 }
1729
1730 /// Records multiple [`Chunk`]s.
1731 ///
1732 /// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
1733 /// for that use [`Self::log_chunks`].
1734 pub fn send_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
1735 for chunk in chunks {
1736 self.send_chunk(chunk);
1737 }
1738 }
1739
1740 /// Swaps the underlying sink for a new one.
1741 ///
1742 /// This guarantees that:
1743 /// 1. all pending rows and chunks are batched, collected and sent down the current sink,
1744 /// 2. the current sink is flushed if it has pending data in its buffers,
1745 /// 3. the current sink's backlog, if there's any, is forwarded to the new sink.
1746 ///
1747 /// When this function returns, the calling thread is guaranteed that all future record calls
1748 /// will end up in the new sink.
1749 ///
1750 /// ## Data loss
1751 ///
1752 /// If the current sink is in a broken state (e.g. a gRPC sink with a broken connection that
1753 /// cannot be repaired), all pending data in its buffers will be dropped.
1754 pub fn set_sink(&self, sink: Box<dyn LogSink>) {
1755 if self.is_forked_child() {
1756 re_log::error_once!(
1757 "Fork detected during set_sink. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1758 );
1759 return;
1760 }
1761
1762 let f = move |inner: &RecordingStreamInner| {
1763 // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1764 // are safe.
1765
1766 // 1. Flush the batcher down the chunk channel
1767 inner.batcher.flush_blocking();
1768
1769 // 2. Receive pending chunks from the batcher's channel
1770 inner.cmds_tx.send(Command::PopPendingChunks).ok();
1771
1772 // 3. Swap the sink, which will internally make sure to re-ingest the backlog if needed
1773 inner.cmds_tx.send(Command::SwapSink(sink)).ok();
1774
1775 // 4. Before we give control back to the caller, we need to make sure that the swap has
1776 // taken place: we don't want the user to send data to the old sink!
1777 re_log::trace!("Waiting for sink swap to complete…");
1778 let (cmd, oneshot) = Command::flush();
1779 inner.cmds_tx.send(cmd).ok();
1780 oneshot.recv().ok();
1781 re_log::trace!("Sink swap completed.");
1782 };
1783
1784 if self.with(f).is_none() {
1785 re_log::warn_once!("Recording disabled - call to set_sink() ignored");
1786 }
1787 }
1788
1789 /// Initiates a flush of the pipeline and returns immediately.
1790 ///
1791 /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
1792 /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1793 pub fn flush_async(&self) {
1794 if self.is_forked_child() {
1795 re_log::error_once!(
1796 "Fork detected during flush_async. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1797 );
1798 return;
1799 }
1800
1801 let f = move |inner: &RecordingStreamInner| {
1802 // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1803 // are safe.
1804
1805 // 1. Synchronously flush the batcher down the chunk channel
1806 //
1807 // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all chunks
1808 // are ready to be drained by the time this call returns.
1809 // It cannot block indefinitely and is fairly fast as it only requires compute (no I/O).
1810 inner.batcher.flush_blocking();
1811
1812 // 2. Drain all pending chunks from the batcher's channel _before_ any other future command
1813 inner.cmds_tx.send(Command::PopPendingChunks).ok();
1814
1815 // 3. Asynchronously flush everything down the sink
1816 let (cmd, _) = Command::flush();
1817 inner.cmds_tx.send(cmd).ok();
1818 };
1819
1820 if self.with(f).is_none() {
1821 re_log::warn_once!("Recording disabled - call to flush_async() ignored");
1822 }
1823 }
1824
1825 /// Initiates a flush the batching pipeline and waits for it to propagate.
1826 ///
1827 /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
1828 pub fn flush_blocking(&self) {
1829 if self.is_forked_child() {
1830 re_log::error_once!(
1831 "Fork detected during flush. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."
1832 );
1833 return;
1834 }
1835
1836 let f = move |inner: &RecordingStreamInner| {
1837 // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
1838 // are safe.
1839
1840 // 1. Flush the batcher down the chunk channel
1841 inner.batcher.flush_blocking();
1842
1843 // 2. Drain all pending chunks from the batcher's channel _before_ any other future command
1844 inner.cmds_tx.send(Command::PopPendingChunks).ok();
1845
1846 // 3. Wait for all chunks to have been forwarded down the sink
1847 let (cmd, oneshot) = Command::flush();
1848 inner.cmds_tx.send(cmd).ok();
1849 oneshot.recv().ok();
1850 };
1851
1852 if self.with(f).is_none() {
1853 re_log::warn_once!("Recording disabled - call to flush_blocking() ignored");
1854 }
1855 }
1856}
1857
1858impl RecordingStream {
1859 /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
1860 /// the specified address.
1861 ///
1862 /// See also [`Self::connect_grpc_opts`] if you wish to configure the connection.
1863 ///
1864 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1865 /// terms of data durability and ordering.
1866 /// See [`Self::set_sink`] for more information.
1867 pub fn connect_grpc(&self) -> RecordingStreamResult<()> {
1868 self.connect_grpc_opts(
1869 format!(
1870 "rerun+http://127.0.0.1:{}/proxy",
1871 re_grpc_server::DEFAULT_SERVER_PORT
1872 ),
1873 crate::default_flush_timeout(),
1874 )
1875 }
1876
1877 /// Swaps the underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to use
1878 /// the specified address.
1879 ///
1880 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1881 /// terms of data durability and ordering.
1882 /// See [`Self::set_sink`] for more information.
1883 ///
1884 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
1885 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
1886 /// call to `flush` to block indefinitely if a connection cannot be established.
1887 pub fn connect_grpc_opts(
1888 &self,
1889 url: impl Into<String>,
1890 flush_timeout: Option<Duration>,
1891 ) -> RecordingStreamResult<()> {
1892 if forced_sink_path().is_some() {
1893 re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
1894 return Ok(());
1895 }
1896
1897 let url: String = url.into();
1898 let re_uri::RedapUri::Proxy(uri) = url.as_str().parse()? else {
1899 return Err(RecordingStreamError::NotAProxyEndpoint);
1900 };
1901
1902 let sink = crate::log_sink::GrpcSink::new(uri, flush_timeout);
1903
1904 self.set_sink(Box::new(sink));
1905 Ok(())
1906 }
1907
1908 #[cfg(feature = "server")]
1909 /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
1910 /// `rerun+http://127.0.0.1:9876/proxy`.
1911 ///
1912 /// To configure the gRPC server's IP and port, use [`Self::serve_grpc_opts`] instead.
1913 ///
1914 /// You can connect a viewer to it with `rerun --connect`.
1915 ///
1916 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
1917 /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
1918 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
1919 pub fn serve_grpc(
1920 &self,
1921 server_memory_limit: re_memory::MemoryLimit,
1922 ) -> RecordingStreamResult<()> {
1923 self.serve_grpc_opts("0.0.0.0", crate::DEFAULT_SERVER_PORT, server_memory_limit)
1924 }
1925
1926 #[cfg(feature = "server")]
1927 /// Swaps the underlying sink for a [`crate::grpc_server::GrpcServerSink`] pre-configured to listen on
1928 /// `rerun+http://{bind_ip}:{port}/proxy`.
1929 ///
1930 /// `0.0.0.0` is a good default for `bind_ip`.
1931 ///
1932 /// The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
1933 /// You can limit the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
1934 /// Once reached, the earliest logged data will be dropped. Static data is never dropped.
1935 pub fn serve_grpc_opts(
1936 &self,
1937 bind_ip: impl AsRef<str>,
1938 port: u16,
1939 server_memory_limit: re_memory::MemoryLimit,
1940 ) -> RecordingStreamResult<()> {
1941 if forced_sink_path().is_some() {
1942 re_log::debug!("Ignored setting GrpcServerSink since {ENV_FORCE_SAVE} is set");
1943 return Ok(());
1944 }
1945
1946 let sink =
1947 crate::grpc_server::GrpcServerSink::new(bind_ip.as_ref(), port, server_memory_limit)?;
1948
1949 self.set_sink(Box::new(sink));
1950 Ok(())
1951 }
1952
1953 /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
1954 /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
1955 /// new process.
1956 ///
1957 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
1958 /// that viewer instead of starting a new one.
1959 ///
1960 /// See also [`Self::spawn_opts`] if you wish to configure the behavior of thew Rerun process
1961 /// as well as the underlying connection.
1962 ///
1963 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1964 /// terms of data durability and ordering.
1965 /// See [`Self::set_sink`] for more information.
1966 pub fn spawn(&self) -> RecordingStreamResult<()> {
1967 self.spawn_opts(&Default::default(), crate::default_flush_timeout())
1968 }
1969
1970 /// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
1971 /// underlying sink for a [`crate::log_sink::GrpcSink`] sink pre-configured to send data to that
1972 /// new process.
1973 ///
1974 /// If a Rerun Viewer is already listening on this port, the stream will be redirected to
1975 /// that viewer instead of starting a new one.
1976 ///
1977 /// The behavior of the spawned Viewer can be configured via `opts`.
1978 /// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
1979 ///
1980 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
1981 /// terms of data durability and ordering.
1982 /// See [`Self::set_sink`] for more information.
1983 ///
1984 /// `flush_timeout` is the minimum time the [`GrpcSink`][`crate::log_sink::GrpcSink`] will
1985 /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
1986 /// call to `flush` to block indefinitely if a connection cannot be established.
1987 pub fn spawn_opts(
1988 &self,
1989 opts: &crate::SpawnOptions,
1990 flush_timeout: Option<Duration>,
1991 ) -> RecordingStreamResult<()> {
1992 if !self.is_enabled() {
1993 re_log::debug!("Rerun disabled - call to spawn() ignored");
1994 return Ok(());
1995 }
1996 if forced_sink_path().is_some() {
1997 re_log::debug!("Ignored setting new GrpcSink since {ENV_FORCE_SAVE} is set");
1998 return Ok(());
1999 }
2000
2001 crate::spawn(opts)?;
2002
2003 self.connect_grpc_opts(
2004 format!("rerun+http://{}/proxy", opts.connect_addr()),
2005 flush_timeout,
2006 )?;
2007
2008 Ok(())
2009 }
2010
2011 /// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
2012 /// [`MemorySinkStorage`].
2013 ///
2014 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2015 /// terms of data durability and ordering.
2016 /// See [`Self::set_sink`] for more information.
2017 pub fn memory(&self) -> MemorySinkStorage {
2018 let sink = crate::sink::MemorySink::new(self.clone());
2019 let storage = sink.buffer();
2020 self.set_sink(Box::new(sink));
2021 storage
2022 }
2023
2024 /// Swaps the underlying sink for a [`crate::sink::BinaryStreamSink`] sink and returns the associated
2025 /// [`BinaryStreamStorage`].
2026 ///
2027 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2028 /// terms of data durability and ordering.
2029 /// See [`Self::set_sink`] for more information.
2030 pub fn binary_stream(&self) -> BinaryStreamStorage {
2031 let (sink, storage) = crate::sink::BinaryStreamSink::new(self.clone());
2032 self.set_sink(Box::new(sink));
2033 storage
2034 }
2035
2036 /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2037 ///
2038 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2039 /// terms of data durability and ordering.
2040 /// See [`Self::set_sink`] for more information.
2041 pub fn save(
2042 &self,
2043 path: impl Into<std::path::PathBuf>,
2044 ) -> Result<(), crate::sink::FileSinkError> {
2045 self.save_opts(path)
2046 }
2047
2048 /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
2049 ///
2050 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2051 /// terms of data durability and ordering.
2052 /// See [`Self::set_sink`] for more information.
2053 ///
2054 /// If a blueprint was provided, it will be stored first in the file.
2055 /// Blueprints are currently an experimental part of the Rust SDK.
2056 pub fn save_opts(
2057 &self,
2058 path: impl Into<std::path::PathBuf>,
2059 ) -> Result<(), crate::sink::FileSinkError> {
2060 if forced_sink_path().is_some() {
2061 re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2062 return Ok(());
2063 }
2064
2065 let sink = crate::sink::FileSink::new(path)?;
2066
2067 self.set_sink(Box::new(sink));
2068
2069 Ok(())
2070 }
2071
2072 /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2073 ///
2074 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2075 /// default back to `buffered` mode, in order not to break the user's terminal.
2076 ///
2077 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2078 /// terms of data durability and ordering.
2079 /// See [`Self::set_sink`] for more information.
2080 pub fn stdout(&self) -> Result<(), crate::sink::FileSinkError> {
2081 self.stdout_opts()
2082 }
2083
2084 /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
2085 ///
2086 /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
2087 /// default back to `buffered` mode, in order not to break the user's terminal.
2088 ///
2089 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2090 /// terms of data durability and ordering.
2091 /// See [`Self::set_sink`] for more information.
2092 ///
2093 /// If a blueprint was provided, it will be stored first in the file.
2094 /// Blueprints are currently an experimental part of the Rust SDK.
2095 pub fn stdout_opts(&self) -> Result<(), crate::sink::FileSinkError> {
2096 if forced_sink_path().is_some() {
2097 re_log::debug!("Ignored setting new file since {ENV_FORCE_SAVE} is set");
2098 return Ok(());
2099 }
2100
2101 if std::io::stdout().is_terminal() {
2102 re_log::debug!("Ignored call to stdout() because stdout is a terminal");
2103 self.set_sink(Box::new(crate::log_sink::BufferedSink::new()));
2104 return Ok(());
2105 }
2106
2107 let sink = crate::sink::FileSink::stdout()?;
2108
2109 self.set_sink(Box::new(sink));
2110
2111 Ok(())
2112 }
2113
2114 /// Swaps the underlying sink for a [`crate::sink::BufferedSink`].
2115 ///
2116 /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
2117 /// terms of data durability and ordering.
2118 /// See [`Self::set_sink`] for more information.
2119 pub fn disconnect(&self) {
2120 let f = move |inner: &RecordingStreamInner| {
2121 // When disconnecting, we need to make sure that pending top-level `DataLoader` threads that
2122 // were started from the SDK run to completion.
2123 inner.wait_for_dataloaders();
2124 self.set_sink(Box::new(crate::sink::BufferedSink::new()));
2125 };
2126
2127 if self.with(f).is_none() {
2128 re_log::warn_once!("Recording disabled - call to disconnect() ignored");
2129 }
2130 }
2131
2132 /// Send a blueprint through this recording stream
2133 pub fn send_blueprint(
2134 &self,
2135 blueprint: Vec<LogMsg>,
2136 activation_cmd: BlueprintActivationCommand,
2137 ) {
2138 let mut blueprint_id = None;
2139 for msg in blueprint {
2140 if blueprint_id.is_none() {
2141 blueprint_id = Some(msg.store_id().clone());
2142 }
2143 self.record_msg(msg);
2144 }
2145
2146 if let Some(blueprint_id) = blueprint_id {
2147 if blueprint_id == activation_cmd.blueprint_id {
2148 // Let the viewer know that the blueprint has been fully received,
2149 // and that it can now be activated.
2150 // We don't want to activate half-loaded blueprints, because that can be confusing,
2151 // and can also lead to problems with view heuristics.
2152 self.record_msg(activation_cmd.into());
2153 } else {
2154 re_log::warn!(
2155 "Blueprint ID mismatch when sending blueprint: {} != {}. Ignoring activation.",
2156 blueprint_id,
2157 activation_cmd.blueprint_id
2158 );
2159 }
2160 }
2161 }
2162}
2163
2164impl fmt::Debug for RecordingStream {
2165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2166 let with = |inner: &RecordingStreamInner| {
2167 let RecordingStreamInner {
2168 // This pattern match prevents _accidentally_ omitting data from the debug output
2169 // when new fields are added.
2170 info,
2171 properties,
2172 tick,
2173 cmds_tx: _,
2174 batcher: _,
2175 batcher_to_sink_handle: _,
2176 dataloader_handles,
2177 pid_at_creation,
2178 } = inner;
2179
2180 f.debug_struct("RecordingStream")
2181 .field("info", &info)
2182 .field("properties", &properties)
2183 .field("tick", &tick)
2184 .field("pending_dataloaders", &dataloader_handles.lock().len())
2185 .field("pid_at_creation", &pid_at_creation)
2186 .finish_non_exhaustive()
2187 };
2188
2189 match self.with(with) {
2190 Some(res) => res,
2191 None => write!(f, "RecordingStream {{ disabled }}"),
2192 }
2193 }
2194}
2195
2196// --- Stateful time ---
2197
2198/// Thread-local data.
2199#[derive(Default)]
2200struct ThreadInfo {
2201 /// The current time per-thread per-recording, which can be set by users.
2202 timepoints: HashMap<StoreId, TimePoint>,
2203}
2204
2205impl ThreadInfo {
2206 fn thread_now(rid: &StoreId) -> TimePoint {
2207 Self::with(|ti| ti.now(rid))
2208 }
2209
2210 fn set_thread_time(rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2211 Self::with(|ti| ti.set_time(rid, timeline, cell));
2212 }
2213
2214 fn unset_thread_time(rid: &StoreId, timeline: &TimelineName) {
2215 Self::with(|ti| ti.unset_time(rid, timeline));
2216 }
2217
2218 fn reset_thread_time(rid: &StoreId) {
2219 Self::with(|ti| ti.reset_time(rid));
2220 }
2221
2222 /// Get access to the thread-local [`ThreadInfo`].
2223 fn with<R>(f: impl FnOnce(&mut Self) -> R) -> R {
2224 use std::cell::RefCell;
2225 thread_local! {
2226 static THREAD_INFO: RefCell<Option<ThreadInfo>> = const { RefCell::new(None) };
2227 }
2228
2229 THREAD_INFO.with(|thread_info| {
2230 let mut thread_info = thread_info.borrow_mut();
2231 let thread_info = thread_info.get_or_insert_with(Self::default);
2232 f(thread_info)
2233 })
2234 }
2235
2236 fn now(&self, rid: &StoreId) -> TimePoint {
2237 let mut timepoint = self.timepoints.get(rid).cloned().unwrap_or_default();
2238 timepoint.insert_cell(TimelineName::log_time(), TimeCell::timestamp_now());
2239 timepoint
2240 }
2241
2242 fn set_time(&mut self, rid: &StoreId, timeline: TimelineName, cell: TimeCell) {
2243 self.timepoints
2244 .entry(rid.clone())
2245 .or_default()
2246 .insert_cell(timeline, cell);
2247 }
2248
2249 fn unset_time(&mut self, rid: &StoreId, timeline: &TimelineName) {
2250 if let Some(timepoint) = self.timepoints.get_mut(rid) {
2251 timepoint.remove(timeline);
2252 }
2253 }
2254
2255 fn reset_time(&mut self, rid: &StoreId) {
2256 if let Some(timepoint) = self.timepoints.get_mut(rid) {
2257 *timepoint = TimePoint::default();
2258 }
2259 }
2260}
2261
2262impl RecordingStream {
2263 /// Returns the current time of the recording on the current thread.
2264 pub fn now(&self) -> TimePoint {
2265 let f = move |inner: &RecordingStreamInner| ThreadInfo::thread_now(&inner.info.store_id);
2266 if let Some(res) = self.with(f) {
2267 res
2268 } else {
2269 re_log::warn_once!("Recording disabled - call to now() ignored");
2270 TimePoint::default()
2271 }
2272 }
2273
2274 /// Set the current time of the recording, for the current calling thread.
2275 ///
2276 /// Used for all subsequent logging performed from this same thread, until the next call
2277 /// to one of the index/time setting methods.
2278 ///
2279 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2280 ///
2281 /// See also:
2282 /// - [`Self::set_time`]
2283 /// - [`Self::set_time_sequence`]
2284 /// - [`Self::set_duration_secs`]
2285 /// - [`Self::disable_timeline`]
2286 /// - [`Self::reset_time`]
2287 pub fn set_timepoint(&self, timepoint: impl Into<TimePoint>) {
2288 let f = move |inner: &RecordingStreamInner| {
2289 let timepoint = timepoint.into();
2290 for (timeline, time) in timepoint {
2291 ThreadInfo::set_thread_time(&inner.info.store_id, timeline, time);
2292 }
2293 };
2294
2295 if self.with(f).is_none() {
2296 re_log::warn_once!("Recording disabled - call to set_timepoint() ignored");
2297 }
2298 }
2299
2300 /// Set the current value of one of the timelines.
2301 ///
2302 /// Used for all subsequent logging performed from this same thread, until the next call
2303 /// to one of the index/time setting methods.
2304 ///
2305 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2306 ///
2307 /// Example:
2308 /// ```no_run
2309 /// # mod rerun { pub use re_sdk::*; }
2310 /// # let rec: rerun::RecordingStream = unimplemented!();
2311 /// rec.set_time("frame_nr", rerun::TimeCell::from_sequence(42));
2312 /// rec.set_time("duration", std::time::Duration::from_millis(123));
2313 /// rec.set_time("capture_time", std::time::SystemTime::now());
2314 /// ```
2315 ///
2316 /// See also:
2317 /// - [`Self::set_timepoint`]
2318 /// - [`Self::set_time_sequence`]
2319 /// - [`Self::set_duration_secs`]
2320 /// - [`Self::disable_timeline`]
2321 /// - [`Self::reset_time`]
2322 pub fn set_time(&self, timeline: impl Into<TimelineName>, value: impl TryInto<TimeCell>) {
2323 let f = move |inner: &RecordingStreamInner| {
2324 let timeline = timeline.into();
2325 if let Ok(value) = value.try_into() {
2326 ThreadInfo::set_thread_time(&inner.info.store_id, timeline, value);
2327 } else {
2328 re_log::warn_once!(
2329 "set_time({timeline}): Failed to convert the given value to an TimeCell"
2330 );
2331 }
2332 };
2333
2334 if self.with(f).is_none() {
2335 re_log::warn_once!("Recording disabled - call to set_time() ignored");
2336 }
2337 }
2338
2339 /// Set the current time of the recording, for the current calling thread.
2340 ///
2341 /// Short for `set_time(timeline, rerun::TimeCell::from_sequence(sequence))`.
2342 ///
2343 /// Used for all subsequent logging performed from this same thread, until the next call
2344 /// to one of the index/time setting methods.
2345 ///
2346 /// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
2347 /// You can remove a timeline again using `rec.disable_timeline("frame_nr")`.
2348 ///
2349 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2350 ///
2351 /// See also:
2352 /// - [`Self::set_time`]
2353 /// - [`Self::set_timepoint`]
2354 /// - [`Self::set_duration_secs`]
2355 /// - [`Self::disable_timeline`]
2356 /// - [`Self::reset_time`]
2357 #[inline]
2358 pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: impl Into<i64>) {
2359 self.set_time(timeline, TimeCell::from_sequence(sequence.into()));
2360 }
2361
2362 /// Set the current time of the recording, for the current calling thread.
2363 ///
2364 /// Short for `set_time(timeline, std::time::Duration::from_secs_f64(secs))`..
2365 ///
2366 /// Used for all subsequent logging performed from this same thread, until the next call
2367 /// to one of the index/time setting methods.
2368 ///
2369 /// For example: `rec.set_duration_secs("time_since_start", time_offset)`.
2370 /// You can remove a timeline again using `rec.disable_timeline("time_since_start")`.
2371 ///
2372 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2373 ///
2374 /// See also:
2375 /// - [`Self::set_time`]
2376 /// - [`Self::set_timepoint`]
2377 /// - [`Self::set_timestamp_secs_since_epoch`]
2378 /// - [`Self::set_time_sequence`]
2379 /// - [`Self::disable_timeline`]
2380 /// - [`Self::reset_time`]
2381 #[inline]
2382 pub fn set_duration_secs(&self, timeline: impl Into<TimelineName>, secs: impl Into<f64>) {
2383 self.set_time(timeline, std::time::Duration::from_secs_f64(secs.into()));
2384 }
2385
2386 /// Set a timestamp as seconds since Unix epoch (1970-01-01 00:00:00 UTC).
2387 ///
2388 /// Short for `self.set_time(timeline, rerun::TimeCell::from_timestamp_secs_since_epoch(secs))`.
2389 ///
2390 /// Used for all subsequent logging performed from this same thread, until the next call
2391 /// to one of the index/time setting methods.
2392 ///
2393 /// For example: `rec.set_duration_secs("time_since_start", time_offset)`.
2394 /// You can remove a timeline again using `rec.disable_timeline("time_since_start")`.
2395 ///
2396 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2397 ///
2398 /// See also:
2399 /// - [`Self::set_time`]
2400 /// - [`Self::set_timepoint`]
2401 /// - [`Self::set_duration_secs`]
2402 /// - [`Self::set_time_sequence`]
2403 /// - [`Self::disable_timeline`]
2404 /// - [`Self::reset_time`]
2405 #[inline]
2406 pub fn set_timestamp_secs_since_epoch(
2407 &self,
2408 timeline: impl Into<TimelineName>,
2409 secs: impl Into<f64>,
2410 ) {
2411 self.set_time(
2412 timeline,
2413 TimeCell::from_timestamp_secs_since_epoch(secs.into()),
2414 );
2415 }
2416
2417 /// Set the current time of the recording, for the current calling thread.
2418 ///
2419 /// Used for all subsequent logging performed from this same thread, until the next call
2420 /// to one of the index/time setting methods.
2421 ///
2422 /// For example: `rec.set_time_secs("sim_time", sim_time_secs)`.
2423 /// You can remove a timeline again using `rec.disable_timeline("sim_time")`.
2424 ///
2425 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2426 ///
2427 /// See also:
2428 /// - [`Self::set_timepoint`]
2429 /// - [`Self::set_time_sequence`]
2430 /// - [`Self::set_time_nanos`]
2431 /// - [`Self::disable_timeline`]
2432 /// - [`Self::reset_time`]
2433 #[deprecated(
2434 since = "0.23.0",
2435 note = "Use either `set_duration_secs` or `set_timestamp_secs_since_epoch` instead"
2436 )]
2437 #[inline]
2438 pub fn set_time_secs(&self, timeline: impl Into<TimelineName>, seconds: impl Into<f64>) {
2439 self.set_duration_secs(timeline, seconds);
2440 }
2441
2442 /// Set the current time of the recording, for the current calling thread.
2443 ///
2444 /// Used for all subsequent logging performed from this same thread, until the next call
2445 /// to one of the index/time setting methods.
2446 ///
2447 /// For example: `rec.set_time_nanos("sim_time", sim_time_nanos)`.
2448 /// You can remove a timeline again using `rec.disable_timeline("sim_time")`.
2449 ///
2450 /// There is no requirement of monotonicity. You can move the time backwards if you like.
2451 ///
2452 /// See also:
2453 /// - [`Self::set_timepoint`]
2454 /// - [`Self::set_time_sequence`]
2455 /// - [`Self::set_time_secs`]
2456 /// - [`Self::disable_timeline`]
2457 /// - [`Self::reset_time`]
2458 #[deprecated(
2459 since = "0.23.0",
2460 note = "Use `set_time` with either `rerun::TimeCell::from_duration_nanos` or `rerun::TimeCell::from_timestamp_nanos_since_epoch`, or with `std::time::Duration` or `std::time::SystemTime`."
2461 )]
2462 #[inline]
2463 pub fn set_time_nanos(
2464 &self,
2465 timeline: impl Into<TimelineName>,
2466 nanos_since_epoch: impl Into<i64>,
2467 ) {
2468 self.set_time(
2469 timeline,
2470 TimeCell::from_timestamp_nanos_since_epoch(nanos_since_epoch.into()),
2471 );
2472 }
2473
2474 /// Clears out the current time of the recording for the specified timeline, for the
2475 /// current calling thread.
2476 ///
2477 /// For example: `rec.disable_timeline("frame")`, `rec.disable_timeline("sim_time")`.
2478 ///
2479 /// See also:
2480 /// - [`Self::set_timepoint`]
2481 /// - [`Self::set_time_sequence`]
2482 /// - [`Self::set_time_secs`]
2483 /// - [`Self::set_time_nanos`]
2484 /// - [`Self::reset_time`]
2485 pub fn disable_timeline(&self, timeline: impl Into<TimelineName>) {
2486 let f = move |inner: &RecordingStreamInner| {
2487 let timeline = timeline.into();
2488 ThreadInfo::unset_thread_time(&inner.info.store_id, &timeline);
2489 };
2490
2491 if self.with(f).is_none() {
2492 re_log::warn_once!("Recording disabled - call to disable_timeline() ignored");
2493 }
2494 }
2495
2496 /// Clears out the current time of the recording, for the current calling thread.
2497 ///
2498 /// Used for all subsequent logging performed from this same thread, until the next call
2499 /// to one of the index/time setting methods.
2500 ///
2501 /// For example: `rec.reset_time()`.
2502 ///
2503 /// See also:
2504 /// - [`Self::set_timepoint`]
2505 /// - [`Self::set_time_sequence`]
2506 /// - [`Self::set_time_secs`]
2507 /// - [`Self::set_time_nanos`]
2508 /// - [`Self::disable_timeline`]
2509 pub fn reset_time(&self) {
2510 let f = move |inner: &RecordingStreamInner| {
2511 ThreadInfo::reset_thread_time(&inner.info.store_id);
2512 };
2513
2514 if self.with(f).is_none() {
2515 re_log::warn_once!("Recording disabled - call to reset_time() ignored");
2516 }
2517 }
2518}
2519
2520// ---
2521
2522#[cfg(test)]
2523mod tests {
2524 use re_log_types::example_components::MyLabel;
2525
2526 use super::*;
2527
2528 #[test]
2529 fn impl_send_sync() {
2530 fn assert_send_sync<T: Send + Sync>() {}
2531 assert_send_sync::<RecordingStream>();
2532 }
2533
2534 #[test]
2535 fn never_flush() {
2536 let rec = RecordingStreamBuilder::new("rerun_example_never_flush")
2537 .enabled(true)
2538 .batcher_config(ChunkBatcherConfig::NEVER)
2539 .buffered()
2540 .unwrap();
2541
2542 let store_info = rec.store_info().unwrap();
2543
2544 let rows = example_rows(false);
2545 for row in rows.clone() {
2546 rec.record_row("a".into(), row, false);
2547 }
2548
2549 let storage = rec.memory();
2550 let mut msgs = {
2551 let mut msgs = storage.take();
2552 msgs.reverse();
2553 msgs
2554 };
2555
2556 // First message should be a set_store_info resulting from the original sink swap to
2557 // buffered mode.
2558 match msgs.pop().unwrap() {
2559 LogMsg::SetStoreInfo(msg) => {
2560 assert!(msg.row_id != *RowId::ZERO);
2561 similar_asserts::assert_eq!(store_info, msg.info);
2562 }
2563 _ => panic!("expected SetStoreInfo"),
2564 }
2565
2566 // Second message should be a set_store_info resulting from the later sink swap from
2567 // buffered mode into in-memory mode.
2568 // This arrives _before_ the data itself since we're using manual flushing.
2569 match msgs.pop().unwrap() {
2570 LogMsg::SetStoreInfo(msg) => {
2571 assert!(msg.row_id != *RowId::ZERO);
2572 similar_asserts::assert_eq!(store_info, msg.info);
2573 }
2574 _ => panic!("expected SetStoreInfo"),
2575 }
2576
2577 // The following flushes were sent as a result of the implicit flush when swapping the
2578 // underlying sink from buffered to in-memory.
2579
2580 // Chunk that contains the `RecordProperties`.
2581 match msgs.pop().unwrap() {
2582 LogMsg::ArrowMsg(rid, msg) => {
2583 assert_eq!(store_info.store_id, rid);
2584
2585 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2586
2587 chunk.sanity_check().unwrap();
2588 }
2589 _ => panic!("expected ArrowMsg"),
2590 }
2591
2592 // Another chunk that contains `RecordProperties`.
2593 match msgs.pop().unwrap() {
2594 LogMsg::ArrowMsg(rid, msg) => {
2595 assert_eq!(store_info.store_id, rid);
2596
2597 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2598
2599 chunk.sanity_check().unwrap();
2600 }
2601 _ => panic!("expected ArrowMsg"),
2602 }
2603
2604 // Final message is the batched chunk itself.
2605 match msgs.pop().unwrap() {
2606 LogMsg::ArrowMsg(rid, msg) => {
2607 assert_eq!(store_info.store_id, rid);
2608
2609 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2610
2611 chunk.sanity_check().unwrap();
2612 }
2613 _ => panic!("expected ArrowMsg"),
2614 }
2615
2616 // That's all.
2617 assert!(msgs.pop().is_none());
2618 }
2619
2620 #[test]
2621 fn always_flush() {
2622 let rec = RecordingStreamBuilder::new("rerun_example_always_flush")
2623 .enabled(true)
2624 .batcher_config(ChunkBatcherConfig::ALWAYS)
2625 .buffered()
2626 .unwrap();
2627
2628 let store_info = rec.store_info().unwrap();
2629
2630 let rows = example_rows(false);
2631 for row in rows.clone() {
2632 rec.record_row("a".into(), row, false);
2633 }
2634
2635 let storage = rec.memory();
2636 let mut msgs = {
2637 let mut msgs = storage.take();
2638 msgs.reverse();
2639 msgs
2640 };
2641
2642 // First message should be a set_store_info resulting from the original sink swap to
2643 // buffered mode.
2644 match msgs.pop().unwrap() {
2645 LogMsg::SetStoreInfo(msg) => {
2646 assert!(msg.row_id != *RowId::ZERO);
2647 similar_asserts::assert_eq!(store_info, msg.info);
2648 }
2649 _ => panic!("expected SetStoreInfo"),
2650 }
2651
2652 // Second message should be a set_store_info resulting from the later sink swap from
2653 // buffered mode into in-memory mode.
2654 // This arrives _before_ the data itself since we're using manual flushing.
2655 match msgs.pop().unwrap() {
2656 LogMsg::SetStoreInfo(msg) => {
2657 assert!(msg.row_id != *RowId::ZERO);
2658 similar_asserts::assert_eq!(store_info, msg.info);
2659 }
2660 _ => panic!("expected SetStoreInfo"),
2661 }
2662
2663 let mut assert_next_row = || match msgs.pop().unwrap() {
2664 LogMsg::ArrowMsg(rid, msg) => {
2665 assert_eq!(store_info.store_id, rid);
2666
2667 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2668
2669 chunk.sanity_check().unwrap();
2670 }
2671 _ => panic!("expected ArrowMsg"),
2672 };
2673
2674 // 3rd, 4th, 5th, 6th, and 7th messages are all the single-row batched chunks themselves,
2675 // which were sent as a result of the implicit flush when swapping the underlying sink
2676 // from buffered to in-memory. Note that these messages contain the 2 recording property
2677 // chunks.
2678 assert_next_row();
2679 assert_next_row();
2680 assert_next_row();
2681 assert_next_row();
2682 assert_next_row();
2683
2684 // That's all.
2685 assert!(msgs.pop().is_none());
2686 }
2687
2688 #[test]
2689 fn flush_hierarchy() {
2690 let (rec, storage) = RecordingStreamBuilder::new("rerun_example_flush_hierarchy")
2691 .enabled(true)
2692 .batcher_config(ChunkBatcherConfig::NEVER)
2693 .memory()
2694 .unwrap();
2695
2696 let store_info = rec.store_info().unwrap();
2697
2698 let rows = example_rows(false);
2699 for row in rows.clone() {
2700 rec.record_row("a".into(), row, false);
2701 }
2702
2703 {
2704 let mut msgs = {
2705 let mut msgs = storage.take();
2706 msgs.reverse();
2707 msgs
2708 };
2709
2710 // First message should be a set_store_info resulting from the original sink swap
2711 // to in-memory mode.
2712 match msgs.pop().unwrap() {
2713 LogMsg::SetStoreInfo(msg) => {
2714 assert!(msg.row_id != *RowId::ZERO);
2715 similar_asserts::assert_eq!(store_info, msg.info);
2716 }
2717 _ => panic!("expected SetStoreInfo"),
2718 }
2719
2720 // For reasons, MemorySink ends up with 2 StoreInfos.
2721 // TODO(jleibs): Avoid a redundant StoreInfo message.
2722 match msgs.pop().unwrap() {
2723 LogMsg::SetStoreInfo(msg) => {
2724 assert!(msg.row_id != *RowId::ZERO);
2725 similar_asserts::assert_eq!(store_info, msg.info);
2726 }
2727 _ => panic!("expected SetStoreInfo"),
2728 }
2729
2730 // MemorySinkStorage transparently handles flushing during `take()`!
2731
2732 // The batch that contains the `RecordingProperties`.
2733 match msgs.pop().unwrap() {
2734 LogMsg::ArrowMsg(rid, msg) => {
2735 assert_eq!(store_info.store_id, rid);
2736
2737 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2738
2739 chunk.sanity_check().unwrap();
2740 }
2741 _ => panic!("expected ArrowMsg"),
2742 }
2743
2744 // For the same reasons as above, another chunk that contains the `RecordingProperties`.
2745 match msgs.pop().unwrap() {
2746 LogMsg::ArrowMsg(rid, msg) => {
2747 assert_eq!(store_info.store_id, rid);
2748
2749 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2750
2751 chunk.sanity_check().unwrap();
2752 }
2753 _ => panic!("expected ArrowMsg"),
2754 }
2755
2756 // The batched chunk itself, which was sent as a result of the explicit flush above.
2757 match msgs.pop().unwrap() {
2758 LogMsg::ArrowMsg(rid, msg) => {
2759 assert_eq!(store_info.store_id, rid);
2760
2761 let chunk = Chunk::from_arrow_msg(&msg).unwrap();
2762
2763 chunk.sanity_check().unwrap();
2764 }
2765 _ => panic!("expected ArrowMsg"),
2766 }
2767
2768 // That's all.
2769 assert!(msgs.pop().is_none());
2770 }
2771 }
2772
2773 #[test]
2774 fn disabled() {
2775 let (rec, storage) = RecordingStreamBuilder::new("rerun_example_disabled")
2776 .enabled(false)
2777 .batcher_config(ChunkBatcherConfig::ALWAYS)
2778 .memory()
2779 .unwrap();
2780
2781 let rows = example_rows(false);
2782 for row in rows.clone() {
2783 rec.record_row("a".into(), row, false);
2784 }
2785
2786 let mut msgs = {
2787 let mut msgs = storage.take();
2788 msgs.reverse();
2789 msgs
2790 };
2791
2792 // That's all.
2793 assert!(msgs.pop().is_none());
2794 }
2795
2796 #[test]
2797 fn test_set_thread_local() {
2798 // Regression-test for https://github.com/rerun-io/rerun/issues/2889
2799 std::thread::Builder::new()
2800 .name("test_thead".to_owned())
2801 .spawn(|| {
2802 let stream = RecordingStreamBuilder::new("rerun_example_test")
2803 .buffered()
2804 .unwrap();
2805 RecordingStream::set_thread_local(StoreKind::Recording, Some(stream));
2806 })
2807 .unwrap()
2808 .join()
2809 .unwrap();
2810 }
2811
2812 fn example_rows(static_: bool) -> Vec<PendingRow> {
2813 use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
2814 use re_types::{Component as _, Loggable};
2815
2816 let mut tick = 0i64;
2817 let mut timepoint = |frame_nr: i64| {
2818 let mut tp = TimePoint::default();
2819 if !static_ {
2820 tp.insert(Timeline::log_time(), re_log_types::Timestamp::now());
2821 tp.insert(Timeline::log_tick(), tick);
2822 tp.insert(Timeline::new_sequence("frame_nr"), frame_nr);
2823 }
2824 tick += 1;
2825 tp
2826 };
2827
2828 let row0 = {
2829 PendingRow {
2830 row_id: RowId::new(),
2831 timepoint: timepoint(1),
2832 components: [
2833 (
2834 MyPoint::descriptor(),
2835 <MyPoint as Loggable>::to_arrow([
2836 MyPoint::new(10.0, 10.0),
2837 MyPoint::new(20.0, 20.0),
2838 ])
2839 .unwrap(),
2840 ), //
2841 (
2842 MyColor::descriptor(),
2843 <MyColor as Loggable>::to_arrow([MyColor(0x8080_80FF)]).unwrap(),
2844 ), //
2845 (
2846 MyLabel::descriptor(),
2847 <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
2848 ), //
2849 ]
2850 .into_iter()
2851 .collect(),
2852 }
2853 };
2854
2855 let row1 = {
2856 PendingRow {
2857 row_id: RowId::new(),
2858 timepoint: timepoint(1),
2859 components: [
2860 (
2861 MyPoint::descriptor(),
2862 <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
2863 ), //
2864 (
2865 MyColor::descriptor(),
2866 <MyColor as Loggable>::to_arrow([] as [MyColor; 0]).unwrap(),
2867 ), //
2868 (
2869 MyLabel::descriptor(),
2870 <MyLabel as Loggable>::to_arrow([] as [MyLabel; 0]).unwrap(),
2871 ), //
2872 ]
2873 .into_iter()
2874 .collect(),
2875 }
2876 };
2877
2878 let row2 = {
2879 PendingRow {
2880 row_id: RowId::new(),
2881 timepoint: timepoint(1),
2882 components: [
2883 (
2884 MyPoint::descriptor(),
2885 <MyPoint as Loggable>::to_arrow([] as [MyPoint; 0]).unwrap(),
2886 ), //
2887 (
2888 MyColor::descriptor(),
2889 <MyColor as Loggable>::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(),
2890 ), //
2891 (
2892 MyLabel::descriptor(),
2893 <MyLabel as Loggable>::to_arrow([MyLabel("hey".into())]).unwrap(),
2894 ), //
2895 ]
2896 .into_iter()
2897 .collect(),
2898 }
2899 };
2900
2901 vec![row0, row1, row2]
2902 }
2903
2904 // See <https://github.com/rerun-io/rerun/pull/8587> for context.
2905 #[test]
2906 fn allows_componentbatch_unsized() {
2907 let labels = [
2908 MyLabel("a".into()),
2909 MyLabel("b".into()),
2910 MyLabel("c".into()),
2911 ];
2912
2913 let (rec, _mem) = RecordingStreamBuilder::new("rerun_example_test_componentbatch_unsized")
2914 .default_enabled(false)
2915 .enabled(false)
2916 .memory()
2917 .unwrap();
2918
2919 // This call used to *not* compile due to a lack of `?Sized` bounds.
2920 use re_types::ComponentBatch as _;
2921 rec.log("labels", &labels.try_serialized().unwrap())
2922 .unwrap();
2923 }
2924}