Skip to main content

rerun_c/
lib.rs

1//! The Rerun C SDK.
2//!
3//! The functions here must match `rerun_cpp/src/rerun/c/rerun.h`.
4
5#![crate_type = "staticlib"]
6#![expect(clippy::missing_safety_doc, clippy::undocumented_unsafe_blocks)] // Too much unsafe
7
8mod arrow_utils;
9mod component_type_registry;
10mod error;
11mod ptr;
12mod recording_streams;
13mod video;
14
15use std::ffi::{CString, c_char, c_float, c_uchar};
16use std::time::Duration;
17
18use arrow::array::{ArrayRef as ArrowArrayRef, ListArray as ArrowListArray};
19use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
20use arrow_utils::arrow_array_from_c_ffi;
21use component_type_registry::COMPONENT_TYPES;
22use re_arrow_util::ArrowArrayDowncastRef as _;
23use re_sdk::external::nohash_hasher::IntMap;
24use re_sdk::external::re_log_types::TimelineName;
25use re_sdk::log::{Chunk, ChunkId, PendingRow, TimeColumn};
26use re_sdk::time::TimeType;
27use re_sdk::{
28    ComponentDescriptor, EntityPath, RecordingStream, RecordingStreamBuilder, StoreKind, TimeCell,
29    TimePoint, Timeline,
30};
31use recording_streams::{RECORDING_STREAMS, recording_stream};
32
33// ----------------------------------------------------------------------------
34// Types:
35
36/// This is called `rr_string` in the C API.
37///
38/// NOTE: [`CStringView`] is NOT an `Option`, and there is no difference between null and "".
39#[repr(C)]
40#[derive(Debug, Copy, Clone)]
41pub struct CStringView {
42    pub string: *const c_char,
43    pub length: u32,
44}
45
46impl CStringView {
47    /// Error if the string is not valid UTF8, or is null and non-zero in length.
48    ///
49    /// May return the empty string.
50    #[expect(clippy::result_large_err)]
51    pub fn as_maybe_empty_str<'a>(&'a self, argument_name: &'a str) -> Result<&'a str, CError> {
52        if self.is_empty() {
53            Ok("")
54        } else {
55            re_log::debug_assert!(
56                1000 < self.string.addr() && self.length < 1_000_000,
57                "Suspected memory corruption when reading argument {argument_name:?}: {self:#?}"
58            );
59            ptr::try_char_ptr_as_str(self.string, self.length, argument_name)
60        }
61    }
62
63    /// Treat the empty string "" as None.
64    #[expect(clippy::result_large_err)]
65    pub fn as_optional_str<'a>(
66        &'a self,
67        argument_name: &'a str,
68    ) -> Result<Option<&'a str>, CError> {
69        if self.is_empty() {
70            Ok(None)
71        } else {
72            self.as_nonempty_str(argument_name).map(Some)
73        }
74    }
75
76    /// Error if the string was empty.
77    #[expect(clippy::result_large_err)]
78    pub fn as_nonempty_str<'a>(&'a self, argument_name: &'a str) -> Result<&'a str, CError> {
79        if self.is_empty() {
80            Err(CError::new(
81                CErrorCode::InvalidStringArgument,
82                &format!("{argument_name:?} was an empty string"),
83            ))
84        } else {
85            self.as_maybe_empty_str(argument_name)
86        }
87    }
88
89    /// Is this the "" string?
90    ///
91    /// NOTE: [`CStringView`] is NOT an `Option`, and there is no difference between null and "".
92    pub fn is_empty(&self) -> bool {
93        self.length == 0
94    }
95}
96
97/// This is called `rr_bytes` in the C API.
98#[repr(C)]
99#[derive(Debug, Copy, Clone)]
100pub struct CBytesView {
101    pub bytes: *const c_uchar,
102    pub length: u32,
103}
104
105impl CBytesView {
106    #[expect(clippy::result_large_err)]
107    pub fn as_bytes<'a>(&self, argument_name: &'a str) -> Result<&'a [u8], CError> {
108        ptr::try_ptr_as_slice(self.bytes, self.length, argument_name)
109    }
110
111    pub fn is_null(&self) -> bool {
112        self.bytes.is_null()
113    }
114
115    pub fn is_empty(&self) -> bool {
116        self.length == 0
117    }
118}
119
120pub type CRecordingStream = u32;
121
122pub type CComponentTypeHandle = u32;
123
124pub const RR_REC_STREAM_CURRENT_RECORDING: CRecordingStream = 0xFFFFFFFF;
125pub const RR_REC_STREAM_CURRENT_BLUEPRINT: CRecordingStream = 0xFFFFFFFE;
126pub const RR_COMPONENT_TYPE_HANDLE_INVALID: CComponentTypeHandle = 0xFFFFFFFF;
127
128/// C version of [`re_sdk::SpawnOptions`].
129///
130/// See `rr_spawn_options` in the C header.
131#[derive(Debug, Clone)]
132#[repr(C)]
133pub struct CSpawnOptions {
134    pub port: u16,
135    pub memory_limit: CStringView,
136    pub server_memory_limit: CStringView,
137    pub hide_welcome_screen: bool,
138    pub detach_process: bool,
139    pub executable_name: CStringView,
140    pub executable_path: CStringView,
141}
142
143impl CSpawnOptions {
144    #[expect(clippy::result_large_err)]
145    pub fn as_rust(&self) -> Result<re_sdk::SpawnOptions, CError> {
146        let Self {
147            port,
148            memory_limit,
149            server_memory_limit,
150            hide_welcome_screen,
151            detach_process,
152            executable_name,
153            executable_path,
154        } = self;
155
156        let mut spawn_opts = re_sdk::SpawnOptions::default();
157
158        if *port != 0 {
159            spawn_opts.port = *port;
160        }
161
162        spawn_opts.wait_for_bind = true;
163
164        if let Some(memory_limit) = memory_limit.as_optional_str("memory_limit")? {
165            spawn_opts.memory_limit = memory_limit.to_owned();
166        }
167        if let Some(server_memory_limit) =
168            server_memory_limit.as_optional_str("server_memory_limit")?
169        {
170            spawn_opts.server_memory_limit = server_memory_limit.to_owned();
171        }
172
173        spawn_opts.hide_welcome_screen = *hide_welcome_screen;
174        spawn_opts.detach_process = *detach_process;
175
176        if let Some(executable_name) = executable_name.as_optional_str("executable_name")? {
177            spawn_opts.executable_name = executable_name.to_owned();
178        }
179
180        if let Some(executable_path) = executable_path.as_optional_str("executable_path")? {
181            spawn_opts.executable_path = Some(executable_path.to_owned());
182        }
183
184        Ok(spawn_opts)
185    }
186}
187
188#[repr(u32)]
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum CStoreKind {
191    /// A recording of user-data.
192    Recording = 1,
193
194    /// Data associated with the blueprint state.
195    Blueprint = 2,
196}
197
198impl From<CStoreKind> for StoreKind {
199    fn from(kind: CStoreKind) -> Self {
200        match kind {
201            CStoreKind::Recording => Self::Recording,
202            CStoreKind::Blueprint => Self::Blueprint,
203        }
204    }
205}
206
207/// See `rr_store_info` in the C header.
208#[repr(C)]
209#[derive(Debug)]
210pub struct CStoreInfo {
211    /// The user-chosen name of the application doing the logging.
212    pub application_id: CStringView,
213
214    /// The user-chosen name of the recording being logged to.
215    ///
216    /// Defaults to a random ID if unspecified.
217    pub recording_id: CStringView,
218
219    pub store_kind: CStoreKind,
220}
221
222/// See `rr_component_descriptor` in the C header.
223#[repr(C)]
224pub struct CComponentDescriptor {
225    pub archetype_name: CStringView,
226    pub component: CStringView,
227    pub component_type: CStringView,
228}
229
230/// See `rr_component_type` in the C header.
231#[repr(C)]
232pub struct CComponentType {
233    pub descriptor: CComponentDescriptor,
234    pub schema: FFI_ArrowSchema,
235}
236
237/// See `rr_component_batch` in the C header.
238#[repr(C)]
239pub struct CComponentBatch {
240    pub component_type: CComponentTypeHandle,
241    pub array: FFI_ArrowArray,
242}
243
244#[repr(C)]
245pub struct CDataRow {
246    pub entity_path: CStringView,
247    pub num_data_cells: u32,
248    pub batches: *mut CComponentBatch,
249}
250
251/// See `rr_component_column` in the C header.
252#[repr(C)]
253pub struct CComponentColumns {
254    pub component_type: CComponentTypeHandle,
255
256    /// A `ListArray` with the datatype `List(component_type)`.
257    pub array: FFI_ArrowArray,
258}
259
260/// See `rr_sorting_status` in the C header.
261#[repr(u32)]
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum CSortingStatus {
264    Unknown = 0,
265    Sorted = 1,
266    Unsorted = 2,
267}
268
269impl CSortingStatus {
270    fn is_sorted(&self) -> Option<bool> {
271        match self {
272            Self::Sorted => Some(true),
273            Self::Unsorted => Some(false),
274            Self::Unknown => None,
275        }
276    }
277}
278
279/// See `rr_time_type` in the C header.
280/// Equivalent to Rust [`re_sdk::time::TimeType`].
281#[repr(u32)]
282#[derive(Debug, Clone, Copy)]
283pub enum CTimeType {
284    /// Used e.g. for frames in a film.
285    Sequence = 1,
286
287    /// Nanoseconds.
288    Duration = 2,
289
290    /// Nanoseconds since Unix epoch (1970-01-01 00:00:00 UTC).
291    Timestamp = 3,
292}
293
294/// See `rr_timeline` in the C header.
295/// Equivalent to Rust [`re_sdk::Timeline`].
296#[repr(C)]
297#[derive(Debug, Clone)]
298pub struct CTimeline {
299    /// The name of the timeline.
300    pub name: CStringView,
301
302    /// The type of the timeline.
303    pub typ: CTimeType,
304}
305
306impl TryFrom<CTimeline> for Timeline {
307    type Error = CError;
308
309    fn try_from(timeline: CTimeline) -> Result<Self, CError> {
310        let name = timeline.name.as_nonempty_str("timeline.name")?;
311        let typ = match timeline.typ {
312            CTimeType::Sequence => TimeType::Sequence,
313            CTimeType::Duration => TimeType::DurationNs,
314            CTimeType::Timestamp => TimeType::TimestampNs,
315        };
316        Ok(Self::new(name, typ))
317    }
318}
319
320/// See `rr_time_column` in the C header.
321/// Equivalent to Rust [`re_sdk::log::TimeColumn`].
322#[repr(C)]
323pub struct CTimeColumn {
324    pub timeline: CTimeline,
325
326    /// Times, a primitive array of i64.
327    pub times: FFI_ArrowArray,
328
329    /// The sorting order of the times array.
330    pub sorting_status: CSortingStatus,
331}
332
333/// Log sink which streams messages to a gRPC server.
334///
335/// The behavior of this sink is the same as the one set by `rr_recording_stream_connect_grpc`.
336///
337/// See `rr_grpc_sink` in the C header.
338#[derive(Debug)]
339#[repr(C)]
340pub struct CGrpcSink {
341    /// A Rerun gRPC URL
342    ///
343    /// Default is `rerun+http://127.0.0.1:9876/proxy`.
344    pub url: CStringView,
345}
346
347/// Log sink which writes messages to a file.
348///
349/// See `rr_file_sink` in the C header.
350#[derive(Debug)]
351#[repr(C)]
352pub struct CFileSink {
353    /// Path to the output file.
354    pub path: CStringView,
355}
356
357/// A sink for log messages.
358///
359/// See specific log sink types for more information:
360/// * [`CGrpcSink`]
361/// * [`CFileSink`]
362///
363/// See `rr_log_sink` and `RR_LOG_SINK_KIND` enum values in the C header.
364///
365/// Layout is defined in [the Rust reference](https://doc.rust-lang.org/stable/reference/type-layout.html#reprc-enums-with-fields).
366#[derive(Debug)]
367#[repr(C, u8)]
368pub enum CLogSink {
369    GrpcSink { grpc: CGrpcSink } = 0,
370    FileSink { file: CFileSink } = 1,
371}
372
373// ⚠️ Remember to also update `uint32_t rr_error_code` AND `enum class ErrorCode` !
374#[repr(u32)]
375#[derive(Debug, Clone, Copy, PartialEq, Eq)]
376pub enum CErrorCode {
377    Ok = 0,
378    OutOfMemory,
379    NotImplemented,
380    SdkVersionMismatch,
381
382    // Invalid argument errors.
383    _CategoryArgument = 0x0000_00010,
384    UnexpectedNullArgument,
385    InvalidStringArgument,
386    InvalidEnumValue,
387    InvalidRecordingStreamHandle,
388    InvalidSocketAddress,
389    InvalidComponentTypeHandle,
390    InvalidTimeArgument,
391    InvalidTensorDimension,
392    InvalidComponent,
393    InvalidServerUrl = 0x0000_0001a,
394    FileRead,
395    InvalidMemoryLimit,
396
397    // Recording stream errors
398    _CategoryRecordingStream = 0x0000_00100,
399    RecordingStreamRuntimeFailure,
400    RecordingStreamCreationFailure,
401    RecordingStreamSaveFailure,
402    RecordingStreamStdoutFailure,
403    RecordingStreamSpawnFailure,
404    RecordingStreamChunkValidationFailure,
405    RecordingStreamServeGrpcFailure,
406    RecordingStreamFlushTimeout,
407    RecordingStreamFlushFailure,
408
409    // Arrow data processing errors.
410    _CategoryArrow = 0x0000_1000,
411    ArrowFfiSchemaImportError,
412    ArrowFfiArrayImportError,
413
414    // Utility errors.
415    _CategoryUtilities = 0x0001_0000,
416    VideoLoadError,
417
418    // Errors relating to file IO.
419    _CategoryFileIO = 0x0010_0000,
420    FileOpenFailure,
421
422    // Errors directly translated from arrow::StatusCode.
423    _CategoryArrowCppStatus = 0x1000_0000,
424
425    Unknown = 0xFFFF_FFFF,
426}
427
428#[repr(C)]
429#[derive(Clone)]
430pub struct CError {
431    pub code: CErrorCode,
432    pub message: [c_char; Self::MAX_MESSAGE_SIZE_BYTES],
433}
434
435// ----------------------------------------------------------------------------
436// Public functions:
437
438// SAFETY: the unsafety comes from #[no_mangle], because we can declare multiple
439// functions with the same symbol names, and the linker behavior in this case i undefined.
440#[expect(unsafe_code)]
441#[unsafe(no_mangle)]
442pub extern "C" fn rr_version_string() -> *const c_char {
443    static VERSION: std::sync::LazyLock<CString> = std::sync::LazyLock::new(|| {
444        CString::new(re_sdk::build_info().version.to_string()).expect("CString::new failed")
445    }); // unwrap: there won't be any NUL bytes in the string
446
447    VERSION.as_ptr()
448}
449
450#[expect(clippy::result_large_err)]
451fn rr_spawn_impl(spawn_opts: *const CSpawnOptions) -> Result<(), CError> {
452    let spawn_opts = if spawn_opts.is_null() {
453        re_sdk::SpawnOptions::default()
454    } else {
455        let spawn_opts = ptr::try_ptr_as_ref(spawn_opts, "spawn_opts")?;
456        spawn_opts.as_rust()?
457    };
458
459    // Port is unused here — this function only spawns the viewer process.
460    // The C SDK connects separately via `rr_recording_stream_spawn`.
461    re_sdk::spawn(&spawn_opts)
462        .map(drop)
463        .map_err(|err| CError::new(CErrorCode::RecordingStreamSpawnFailure, &err.to_string()))?;
464
465    Ok(())
466}
467
468#[expect(unsafe_code)]
469#[unsafe(no_mangle)]
470pub extern "C" fn rr_spawn(spawn_opts: *const CSpawnOptions, error: *mut CError) {
471    if let Err(err) = rr_spawn_impl(spawn_opts) {
472        err.write_error(error);
473    }
474}
475
476#[expect(clippy::result_large_err)]
477fn rr_register_component_type_impl(
478    component_type: &CComponentType,
479) -> Result<CComponentTypeHandle, CError> {
480    let CComponentDescriptor {
481        archetype_name,
482        component,
483        component_type: component_type_descr,
484    } = &component_type.descriptor;
485
486    let archetype_name =
487        archetype_name.as_optional_str("component_type.descriptor.archetype_name")?;
488
489    let component = component.as_nonempty_str("component_type.descriptor.component")?;
490
491    let component_type_descr =
492        component_type_descr.as_optional_str("component_type.descriptor.component_type")?;
493
494    let component_descr = ComponentDescriptor {
495        archetype: archetype_name.map(Into::into),
496        component: component.into(),
497        component_type: component_type_descr.map(Into::into),
498    };
499
500    let field = arrow::datatypes::Field::try_from(&component_type.schema).map_err(|err| {
501        CError::new(
502            CErrorCode::ArrowFfiSchemaImportError,
503            &format!("Failed to import ffi schema: {err}"),
504        )
505    })?;
506
507    Ok(COMPONENT_TYPES
508        .write()
509        .register(component_descr, field.data_type().clone()))
510}
511
512#[expect(unsafe_code)]
513#[unsafe(no_mangle)]
514pub extern "C" fn rr_register_component_type(
515    // Note that since this is passed by value, arrow will release the schema on drop!
516    component_type: CComponentType,
517    error: *mut CError,
518) -> u32 {
519    match rr_register_component_type_impl(&component_type) {
520        Ok(id) => id,
521        Err(err) => {
522            err.write_error(error);
523            RR_COMPONENT_TYPE_HANDLE_INVALID
524        }
525    }
526}
527
528#[expect(clippy::result_large_err)]
529fn rr_recording_stream_new_impl(
530    store_info: *const CStoreInfo,
531    default_enabled: bool,
532) -> Result<CRecordingStream, CError> {
533    {
534        use std::sync::Once;
535        static INIT: Once = Once::new();
536        INIT.call_once(|| {
537            re_log::setup_logging();
538            if cfg!(debug_assertions) {
539                re_crash_handler::install_crash_handlers(re_build_info::build_info!());
540
541                // Log a clear warning to inform users that (accidentally) use a debug build of the SDK.
542                // This should however _never_ cause a panic if RERUN_PANIC_ON_WARN is set, e.g. in test environments.
543                const DEBUG_BUILD_WARNING: &str =
544                    "Using a DEBUG BUILD of the Rerun SDK with Rerun crash handlers!";
545                let can_log_warning = std::env::var("RERUN_PANIC_ON_WARN")
546                    .map(|value| value == "0")
547                    .unwrap_or(true);
548                if can_log_warning {
549                    re_log::warn!(DEBUG_BUILD_WARNING);
550                } else {
551                    re_log::info!(DEBUG_BUILD_WARNING);
552                }
553            }
554        });
555    }
556
557    let store_info = ptr::try_ptr_as_ref(store_info, "store_info")?;
558
559    let CStoreInfo {
560        application_id,
561        recording_id,
562        store_kind,
563    } = *store_info;
564
565    let application_id = application_id.as_nonempty_str("store_info.application_id")?;
566
567    let mut rec_builder = RecordingStreamBuilder::new(application_id)
568        //.store_id(recording_id.clone()) // TODO(andreas): Expose store id.
569        .store_source(re_sdk::external::re_log_types::StoreSource::CSdk)
570        .default_enabled(default_enabled);
571
572    if let Some(recording_id) = recording_id.as_optional_str("recording_id")? {
573        rec_builder = rec_builder.recording_id(recording_id);
574    }
575
576    if store_kind == CStoreKind::Blueprint {
577        rec_builder = rec_builder.blueprint();
578    }
579
580    let rec = rec_builder.buffered().map_err(|err| {
581        CError::new(
582            CErrorCode::RecordingStreamCreationFailure,
583            &format!("Failed to create recording stream: {err}"),
584        )
585    })?;
586    Ok(RECORDING_STREAMS.lock().insert(rec))
587}
588
589#[expect(unsafe_code)]
590#[unsafe(no_mangle)]
591pub extern "C" fn rr_recording_stream_new(
592    store_info: *const CStoreInfo,
593    default_enabled: bool,
594    error: *mut CError,
595) -> CRecordingStream {
596    match rr_recording_stream_new_impl(store_info, default_enabled) {
597        Err(err) => {
598            err.write_error(error);
599            0
600        }
601        Ok(id) => id,
602    }
603}
604
605/// See `THREAD_LIFE_TRACKER` for more information.
606struct TrivialTypeWithDrop;
607
608impl Drop for TrivialTypeWithDrop {
609    fn drop(&mut self) {
610        // Try to ensure that drop doesn't get optimized away.
611        std::hint::black_box(self);
612    }
613}
614
615thread_local! {
616    /// It can happen that we end up inside of [`rr_recording_stream_free`] during a thread shutdown.
617    /// This happens either when:
618    /// * the application shuts down, causing the destructor of globally defined recordings to be invoked
619    ///   -> Not an issue, we likely already destroyed the recording list.
620    /// * the user stored their C++ recording in a thread local variable, and then shut down the thread.
621    ///   -> More problematic, since we can't access `RECORDING_STREAMS` now, meaning we leak the recording.
622    ///      (we can't access it because we use channels internally which in turn use thread-local storage)
623    /// In either case we have a problem, since destroying a recording bottoms out to some thread-local storage
624    /// access inside of channels, causing a crash!
625    ///
626    /// So how do we figure out that our thread is shutting down?
627    /// As of writing `std::thread::current()` panics if there's nothing on `std::sys_common::thread_info::current_thread()`.
628    /// Unfortunately, `std::sys_common` is a private implementation detail!
629    /// So instead, we try accessing a thread local variable and see if that's still possible.
630    /// If not, then we assume that the thread is shutting down.
631    ///
632    /// Just any thread local variable will not do though!
633    /// We need something that is guaranteed to be dropped with the thread shutting down.
634    /// A simple integer value won't do that, `Box` works but seems wasteful, so we use a trivial type with a drop implementation.
635    pub static THREAD_LIFE_TRACKER: TrivialTypeWithDrop = const { TrivialTypeWithDrop };
636}
637
638#[expect(unsafe_code)]
639#[unsafe(no_mangle)]
640pub extern "C" fn rr_recording_stream_free(id: CRecordingStream) {
641    if THREAD_LIFE_TRACKER.try_with(|_v| {}).is_ok() {
642        if let Some(stream) = RECORDING_STREAMS.lock().remove(id) {
643            // Before we called `stream.disconnect()` here`, which unnecessarily replaced the current sink with a
644            // buffered sink that would be immediately dropped afterwards. Not only did this cause spam in the
645            // log outputs, it also lead to race conditions upon (log) application shutdown.
646            drop(stream);
647        }
648    } else {
649        // Yes, at least as of writing we can still log things in this state!
650        re_log::debug!(
651            "rr_recording_stream_free called on a thread that is shutting down and can no longer access thread locals. We can't handle this and have to ignore this call."
652        );
653    }
654}
655
656#[expect(unsafe_code)]
657#[unsafe(no_mangle)]
658pub extern "C" fn rr_recording_stream_set_global(id: CRecordingStream, store_kind: CStoreKind) {
659    let stream = RECORDING_STREAMS.lock().get(id);
660    RecordingStream::set_global(store_kind.into(), stream);
661}
662
663#[expect(unsafe_code)]
664#[unsafe(no_mangle)]
665pub extern "C" fn rr_recording_stream_set_thread_local(
666    id: CRecordingStream,
667    store_kind: CStoreKind,
668) {
669    let stream = RECORDING_STREAMS.lock().get(id);
670    RecordingStream::set_thread_local(store_kind.into(), stream);
671}
672
673#[expect(unsafe_code)]
674#[unsafe(no_mangle)]
675pub extern "C" fn rr_recording_stream_is_enabled(
676    stream: CRecordingStream,
677    error: *mut CError,
678) -> bool {
679    match rr_recording_stream_is_enabled_impl(stream) {
680        Ok(enabled) => enabled,
681        Err(err) => {
682            err.write_error(error);
683            false
684        }
685    }
686}
687
688#[expect(clippy::result_large_err)]
689fn rr_recording_stream_is_enabled_impl(id: CRecordingStream) -> Result<bool, CError> {
690    Ok(recording_stream(id)?.is_enabled())
691}
692
693#[expect(unsafe_code)]
694#[unsafe(no_mangle)]
695pub unsafe extern "C" fn rr_recording_stream_flush_blocking(
696    id: CRecordingStream,
697    timeout_sec: c_float,
698    error: *mut CError,
699) {
700    if let Some(stream) = RECORDING_STREAMS.lock().get(id) {
701        let timeout = if timeout_sec.is_nan() {
702            if let Some(error) = unsafe { error.as_mut() } {
703                *error = CError::new(CErrorCode::InvalidTimeArgument, "NaN timeout");
704            }
705            Duration::ZERO
706        } else if timeout_sec < 0.0 {
707            if let Some(error) = unsafe { error.as_mut() } {
708                *error = CError::new(CErrorCode::InvalidTimeArgument, "Negative timeout");
709            }
710            Duration::ZERO
711        } else {
712            Duration::try_from_secs_f32(timeout_sec)
713                .ok()
714                .unwrap_or(Duration::MAX)
715        };
716        if let Err(err) = stream.flush_with_timeout(timeout)
717            && let Some(error) = unsafe { error.as_mut() }
718        {
719            let code = match &err {
720                re_sdk::sink::SinkFlushError::Timeout => CErrorCode::RecordingStreamFlushTimeout,
721                re_sdk::sink::SinkFlushError::Failed { .. } => {
722                    CErrorCode::RecordingStreamFlushFailure
723                }
724            };
725            *error = CError::new(code, &err.to_string());
726        }
727    }
728}
729
730#[expect(unsafe_code)]
731#[expect(clippy::result_large_err)]
732fn rr_recording_stream_set_sinks_impl(
733    stream: CRecordingStream,
734    raw_sinks: *mut CLogSink,
735    num_sinks: u32,
736) -> Result<(), CError> {
737    let stream = recording_stream(stream)?;
738
739    let raw_sinks = unsafe { std::slice::from_raw_parts_mut(raw_sinks, num_sinks as usize) };
740
741    let mut sinks: Vec<Box<dyn re_sdk::sink::LogSink>> = Vec::with_capacity(num_sinks as usize);
742    for sink in raw_sinks {
743        match sink {
744            CLogSink::GrpcSink { grpc } => {
745                let uri = grpc
746                    .url
747                    .as_nonempty_str("url")?
748                    .parse::<re_sdk::external::re_uri::ProxyUri>()
749                    .map_err(|err| CError::new(CErrorCode::InvalidServerUrl, &err.to_string()))?;
750                sinks.push(Box::new(re_sdk::sink::GrpcSink::new(uri)));
751            }
752            CLogSink::FileSink { file } => {
753                let path = file.path.as_nonempty_str("path")?;
754                sinks.push(Box::new(re_sdk::sink::FileSink::new(path).map_err(
755                    |err| {
756                        CError::new(
757                            CErrorCode::RecordingStreamSaveFailure,
758                            &format!("Failed to save recording stream to {path:?}: {err}"),
759                        )
760                    },
761                )?));
762            }
763        }
764    }
765
766    stream.set_sinks(sinks);
767
768    Ok(())
769}
770
771#[expect(unsafe_code)]
772#[unsafe(no_mangle)]
773pub extern "C" fn rr_recording_stream_set_sinks(
774    id: CRecordingStream,
775    sinks: *mut CLogSink,
776    num_sinks: u32,
777    error: *mut CError,
778) {
779    if let Err(err) = rr_recording_stream_set_sinks_impl(id, sinks, num_sinks) {
780        err.write_error(error);
781    }
782}
783
784#[expect(clippy::result_large_err)]
785fn rr_recording_stream_connect_grpc_impl(
786    stream: CRecordingStream,
787    url: CStringView,
788) -> Result<(), CError> {
789    let stream = recording_stream(stream)?;
790
791    let url = url.as_nonempty_str("url")?;
792
793    if let Err(err) = stream.connect_grpc_opts(url) {
794        return Err(CError::new(CErrorCode::InvalidServerUrl, &err.to_string()));
795    }
796
797    Ok(())
798}
799
800#[expect(unsafe_code)]
801#[unsafe(no_mangle)]
802pub extern "C" fn rr_recording_stream_connect_grpc(
803    id: CRecordingStream,
804    url: CStringView,
805    error: *mut CError,
806) {
807    if let Err(err) = rr_recording_stream_connect_grpc_impl(id, url) {
808        err.write_error(error);
809    }
810}
811
812#[expect(clippy::result_large_err)]
813fn rr_recording_stream_serve_grpc_impl(
814    stream: CRecordingStream,
815    bind_ip: CStringView,
816    port: u16,
817    server_memory_limit: CStringView,
818    newest_first: bool,
819) -> Result<(), CError> {
820    let stream = recording_stream(stream)?;
821
822    let bind_ip = bind_ip.as_nonempty_str("bind_ip")?;
823    let server_options = re_sdk::ServerOptions {
824        playback_behavior: re_sdk::PlaybackBehavior::from_newest_first(newest_first),
825
826        memory_limit: server_memory_limit
827            .as_maybe_empty_str("server_memory_limit")?
828            .parse::<re_sdk::MemoryLimit>()
829            .map_err(|err| CError::new(CErrorCode::InvalidMemoryLimit, &err))?,
830    };
831
832    stream
833        .serve_grpc_opts(bind_ip, port, server_options)
834        .map_err(|err| {
835            CError::new(
836                CErrorCode::RecordingStreamServeGrpcFailure,
837                &err.to_string(),
838            )
839        })?;
840
841    Ok(())
842}
843
844#[expect(unsafe_code)]
845#[unsafe(no_mangle)]
846pub extern "C" fn rr_recording_stream_serve_grpc(
847    id: CRecordingStream,
848    bind_ip: CStringView,
849    port: u16,
850    server_memory_limit: CStringView,
851    newest_first: bool,
852    error: *mut CError,
853) {
854    if let Err(err) =
855        rr_recording_stream_serve_grpc_impl(id, bind_ip, port, server_memory_limit, newest_first)
856    {
857        err.write_error(error);
858    }
859}
860
861#[expect(clippy::result_large_err)]
862fn rr_recording_stream_spawn_impl(
863    stream: CRecordingStream,
864    spawn_opts: *const CSpawnOptions,
865) -> Result<(), CError> {
866    let stream = recording_stream(stream)?;
867
868    let spawn_opts = if spawn_opts.is_null() {
869        re_sdk::SpawnOptions::default()
870    } else {
871        let spawn_opts = ptr::try_ptr_as_ref(spawn_opts, "spawn_opts")?;
872        spawn_opts.as_rust()?
873    };
874
875    stream
876        .spawn_opts(&spawn_opts)
877        .map_err(|err| CError::new(CErrorCode::RecordingStreamSpawnFailure, &err.to_string()))?;
878
879    Ok(())
880}
881
882#[expect(unsafe_code)]
883#[unsafe(no_mangle)]
884pub extern "C" fn rr_recording_stream_spawn(
885    id: CRecordingStream,
886    spawn_opts: *const CSpawnOptions,
887    error: *mut CError,
888) {
889    if let Err(err) = rr_recording_stream_spawn_impl(id, spawn_opts) {
890        err.write_error(error);
891    }
892}
893
894#[expect(clippy::result_large_err)]
895fn rr_recording_stream_save_impl(
896    stream: CRecordingStream,
897    rrd_filepath: CStringView,
898) -> Result<(), CError> {
899    let rrd_filepath = rrd_filepath.as_nonempty_str("path")?;
900    recording_stream(stream)?.save(rrd_filepath).map_err(|err| {
901        CError::new(
902            CErrorCode::RecordingStreamSaveFailure,
903            &format!("Failed to save recording stream to {rrd_filepath:?}: {err}"),
904        )
905    })
906}
907
908#[expect(unsafe_code)]
909#[unsafe(no_mangle)]
910pub extern "C" fn rr_recording_stream_save(
911    id: CRecordingStream,
912    path: CStringView,
913    error: *mut CError,
914) {
915    if let Err(err) = rr_recording_stream_save_impl(id, path) {
916        err.write_error(error);
917    }
918}
919
920#[expect(clippy::result_large_err)]
921fn rr_recording_stream_stdout_impl(stream: CRecordingStream) -> Result<(), CError> {
922    recording_stream(stream)?.stdout().map_err(|err| {
923        CError::new(
924            CErrorCode::RecordingStreamStdoutFailure,
925            &format!("Failed to forward recording stream to stdout: {err}"),
926        )
927    })
928}
929
930#[expect(unsafe_code)]
931#[unsafe(no_mangle)]
932pub extern "C" fn rr_recording_stream_stdout(id: CRecordingStream, error: *mut CError) {
933    if let Err(err) = rr_recording_stream_stdout_impl(id) {
934        err.write_error(error);
935    }
936}
937
938#[expect(clippy::result_large_err)]
939fn rr_recording_stream_set_time_impl(
940    stream: CRecordingStream,
941    timeline_name: CStringView,
942    time_type: CTimeType,
943    value: i64,
944) -> Result<(), CError> {
945    let timeline = timeline_name.as_nonempty_str("timeline_name")?;
946    let stream = recording_stream(stream)?;
947    let time_type = match time_type {
948        CTimeType::Sequence => TimeType::Sequence,
949        CTimeType::Duration => TimeType::DurationNs,
950        CTimeType::Timestamp => TimeType::TimestampNs,
951    };
952    stream.set_time(timeline, TimeCell::new(time_type, value));
953    Ok(())
954}
955
956#[expect(unsafe_code)]
957#[unsafe(no_mangle)]
958pub extern "C" fn rr_recording_stream_set_time(
959    stream: CRecordingStream,
960    timeline_name: CStringView,
961    time_type: CTimeType,
962    value: i64,
963    error: *mut CError,
964) {
965    if let Err(err) = rr_recording_stream_set_time_impl(stream, timeline_name, time_type, value) {
966        err.write_error(error);
967    }
968}
969
970#[expect(clippy::result_large_err)]
971fn rr_recording_stream_disable_timeline_impl(
972    stream: CRecordingStream,
973    timeline_name: CStringView,
974) -> Result<(), CError> {
975    let timeline = timeline_name.as_nonempty_str("timeline_name")?;
976    recording_stream(stream)?.disable_timeline(timeline);
977    Ok(())
978}
979
980#[expect(unsafe_code)]
981#[unsafe(no_mangle)]
982pub extern "C" fn rr_recording_stream_disable_timeline(
983    stream: CRecordingStream,
984    timeline_name: CStringView,
985    error: *mut CError,
986) {
987    if let Err(err) = rr_recording_stream_disable_timeline_impl(stream, timeline_name) {
988        err.write_error(error);
989    }
990}
991
992#[expect(unsafe_code)]
993#[unsafe(no_mangle)]
994pub extern "C" fn rr_recording_stream_reset_time(stream: CRecordingStream) {
995    if let Some(stream) = RECORDING_STREAMS.lock().get(stream) {
996        stream.reset_time();
997    }
998}
999
1000#[expect(unsafe_code)]
1001#[expect(clippy::result_large_err)]
1002#[expect(clippy::needless_pass_by_value)] // Conceptually we're consuming the data_row, as we take ownership of data it points to.
1003fn rr_recording_stream_log_impl(
1004    stream: CRecordingStream,
1005    data_row: CDataRow,
1006    inject_time: bool,
1007) -> Result<(), CError> {
1008    // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1009    // TODO(emilk): move to before we arrow-serialize the data
1010    let row_id = re_sdk::log::RowId::new();
1011
1012    let stream = recording_stream(stream)?;
1013
1014    let CDataRow {
1015        entity_path,
1016        num_data_cells,
1017        batches,
1018    } = data_row;
1019
1020    let entity_path = entity_path.as_maybe_empty_str("entity_path")?;
1021    let entity_path = EntityPath::parse_forgiving(entity_path);
1022
1023    let num_data_cells = num_data_cells as usize;
1024
1025    let batches = unsafe { std::slice::from_raw_parts_mut(batches, num_data_cells) };
1026
1027    let mut components = IntMap::default();
1028    {
1029        let component_type_registry = COMPONENT_TYPES.read();
1030
1031        for batch in batches {
1032            let CComponentBatch {
1033                component_type,
1034                array,
1035            } = batch;
1036            let component_type = component_type_registry.get(*component_type)?;
1037            let datatype = component_type.datatype.clone();
1038            let array = unsafe { FFI_ArrowArray::from_raw(array) }; // Move out from `batches`
1039            let values = unsafe { arrow_array_from_c_ffi(array, datatype) }?;
1040            let batch =
1041                re_sdk::SerializedComponentBatch::new(values, component_type.descriptor.clone());
1042            components.insert(batch.descriptor.component, batch);
1043        }
1044    }
1045
1046    let row = PendingRow {
1047        row_id,
1048        timepoint: TimePoint::default(), // we use the one in the recording stream for now
1049        components,
1050    };
1051
1052    stream.record_row(entity_path, row, inject_time);
1053
1054    Ok(())
1055}
1056
1057#[expect(unsafe_code)]
1058#[unsafe(no_mangle)]
1059pub unsafe extern "C" fn rr_recording_stream_log(
1060    stream: CRecordingStream,
1061    data_row: CDataRow,
1062    inject_time: bool,
1063    error: *mut CError,
1064) {
1065    if let Err(err) = rr_recording_stream_log_impl(stream, data_row, inject_time) {
1066        err.write_error(error);
1067    }
1068}
1069
1070#[expect(clippy::result_large_err)]
1071fn rr_recording_stream_log_file_from_path_impl(
1072    stream: CRecordingStream,
1073    filepath: CStringView,
1074    entity_path_prefix: CStringView,
1075    static_: bool,
1076) -> Result<(), CError> {
1077    let stream = recording_stream(stream)?;
1078
1079    let filepath = filepath.as_nonempty_str("filepath")?;
1080    let entity_path_prefix = entity_path_prefix.as_optional_str("entity_path_prefix")?;
1081
1082    stream
1083        .log_file_from_path(filepath, entity_path_prefix.map(Into::into), static_)
1084        .map_err(|err| {
1085            CError::new(
1086                CErrorCode::RecordingStreamRuntimeFailure,
1087                &format!("Couldn't load file {filepath:?}: {err}"),
1088            )
1089        })?;
1090
1091    Ok(())
1092}
1093
1094#[expect(unsafe_code)]
1095#[unsafe(no_mangle)]
1096pub unsafe extern "C" fn rr_recording_stream_log_file_from_path(
1097    stream: CRecordingStream,
1098    filepath: CStringView,
1099    entity_path_prefix: CStringView,
1100    static_: bool,
1101    error: *mut CError,
1102) {
1103    if let Err(err) =
1104        rr_recording_stream_log_file_from_path_impl(stream, filepath, entity_path_prefix, static_)
1105    {
1106        err.write_error(error);
1107    }
1108}
1109
1110#[expect(clippy::result_large_err)]
1111fn rr_recording_stream_log_file_from_contents_impl(
1112    stream: CRecordingStream,
1113    filepath: CStringView,
1114    contents: CBytesView,
1115    entity_path_prefix: CStringView,
1116    static_: bool,
1117) -> Result<(), CError> {
1118    let stream = recording_stream(stream)?;
1119
1120    let filepath = filepath.as_nonempty_str("filepath")?;
1121    let contents = contents.as_bytes("contents")?;
1122    let entity_path_prefix = entity_path_prefix.as_optional_str("entity_path_prefix")?;
1123
1124    stream
1125        .log_file_from_contents(
1126            filepath,
1127            std::borrow::Cow::Borrowed(contents),
1128            entity_path_prefix.map(Into::into),
1129            static_,
1130        )
1131        .map_err(|err| {
1132            CError::new(
1133                CErrorCode::RecordingStreamRuntimeFailure,
1134                &format!("Couldn't load file {filepath:?}: {err}"),
1135            )
1136        })?;
1137
1138    Ok(())
1139}
1140
1141#[expect(unsafe_code)]
1142#[unsafe(no_mangle)]
1143pub unsafe extern "C" fn rr_recording_stream_log_file_from_contents(
1144    stream: CRecordingStream,
1145    filepath: CStringView,
1146    contents: CBytesView,
1147    entity_path_prefix: CStringView,
1148    static_: bool,
1149    error: *mut CError,
1150) {
1151    if let Err(err) = rr_recording_stream_log_file_from_contents_impl(
1152        stream,
1153        filepath,
1154        contents,
1155        entity_path_prefix,
1156        static_,
1157    ) {
1158        err.write_error(error);
1159    }
1160}
1161
1162#[expect(unsafe_code)]
1163#[expect(clippy::result_large_err)]
1164fn rr_recording_stream_send_columns_impl(
1165    stream: CRecordingStream,
1166    entity_path: CStringView,
1167    time_columns: &mut [CTimeColumn],
1168    component_columns: &mut [CComponentColumns],
1169) -> Result<(), CError> {
1170    // Create chunk-id as early as possible. It has a timestamp and is used to estimate e2e latency.
1171    let id = ChunkId::new();
1172
1173    let stream = recording_stream(stream)?;
1174    let entity_path = entity_path.as_maybe_empty_str("entity_path")?;
1175
1176    let time_columns: IntMap<TimelineName, TimeColumn> = time_columns
1177        .iter_mut()
1178        .map(|time_column| {
1179            let timeline: Timeline = time_column.timeline.clone().try_into()?;
1180            let datatype = arrow::datatypes::DataType::Int64;
1181            let array = unsafe { FFI_ArrowArray::from_raw(&mut time_column.times) } ; // Move out of the array
1182            let time_values_untyped = unsafe { arrow_array_from_c_ffi(array, datatype) }?;
1183            let time_values = TimeColumn::read_array(&ArrowArrayRef::from(time_values_untyped)).map_err(|err| {
1184                CError::new(
1185                    CErrorCode::ArrowFfiArrayImportError,
1186                    &format!("Arrow C FFI import did not produce a Int64 time array - please file an issue at https://github.com/rerun-io/rerun/issues if you see this! This shouldn't be possible since conversion from C was successful with this datatype. Details: {err}")
1187                )
1188            })?;
1189
1190            Ok((
1191                *timeline.name(),
1192                TimeColumn::new(
1193                    time_column.sorting_status.is_sorted(),
1194                    timeline,
1195                    time_values.clone(),
1196                ),
1197            ))
1198        })
1199        .collect::<Result<_, CError>>()?;
1200
1201    let components: IntMap<ComponentDescriptor, ArrowListArray> = {
1202        let component_type_registry = COMPONENT_TYPES.read();
1203        component_columns
1204            .iter_mut()
1205            .map(|batch| {
1206                let CComponentColumns {
1207                    component_type,
1208                    array,
1209                } = batch;
1210                let component_type = component_type_registry.get(*component_type)?;
1211
1212                let nullable = true;
1213                let list_datatype = arrow::datatypes::DataType::List(arrow::datatypes::Field::new_list_field(component_type.datatype.clone(), nullable).into());
1214
1215                let array = unsafe { FFI_ArrowArray::from_raw(array) }; // Move out of the array
1216                let component_values_untyped = unsafe { arrow_array_from_c_ffi(array, list_datatype) }?;
1217                let component_values = component_values_untyped
1218                    .downcast_array_ref::<ArrowListArray>()
1219                    .ok_or_else(|| {
1220                        CError::new(
1221                            CErrorCode::ArrowFfiArrayImportError,
1222                            "Arrow C FFI import did not produce a ListArray - please file an issue at https://github.com/rerun-io/rerun/issues if you see this! This shouldn't be possible since conversion from C was successful with this datatype.",
1223                        )
1224                    })?;
1225
1226                Ok((component_type.descriptor.clone(), component_values.clone()))
1227            })
1228            .collect::<Result<_, CError>>()?
1229    };
1230
1231    let chunk = Chunk::from_auto_row_ids(
1232        id,
1233        entity_path.into(),
1234        time_columns,
1235        components.into_iter().collect(),
1236    )
1237    .map_err(|err| {
1238        CError::new(
1239            CErrorCode::RecordingStreamChunkValidationFailure,
1240            &format!("Failed to create chunk: {err}"),
1241        )
1242    })?;
1243
1244    stream.send_chunk(chunk);
1245
1246    Ok(())
1247}
1248
1249#[expect(unsafe_code)]
1250#[unsafe(no_mangle)]
1251pub unsafe extern "C" fn rr_recording_stream_send_columns(
1252    stream: CRecordingStream,
1253    entity_path: CStringView,
1254    time_columns: *mut CTimeColumn,
1255    num_time_columns: u32,
1256    component_batches: *mut CComponentColumns,
1257    num_component_batches: u32,
1258    error: *mut CError,
1259) {
1260    let time_columns =
1261        unsafe { std::slice::from_raw_parts_mut(time_columns, num_time_columns as usize) };
1262    let component_batches = unsafe {
1263        std::slice::from_raw_parts_mut(component_batches, num_component_batches as usize)
1264    };
1265
1266    if let Err(err) =
1267        rr_recording_stream_send_columns_impl(stream, entity_path, time_columns, component_batches)
1268    {
1269        err.write_error(error);
1270    }
1271}
1272
1273// ----------------------------------------------------------------------------
1274// Private functions
1275
1276#[expect(unsafe_code)]
1277#[unsafe(no_mangle)]
1278pub unsafe extern "C" fn _rr_escape_entity_path_part(part: CStringView) -> *const c_char {
1279    let Ok(part) = part.as_maybe_empty_str("entity_path_part") else {
1280        return std::ptr::null();
1281    };
1282
1283    let part = re_sdk::EntityPathPart::from(part).escaped_string();
1284
1285    let Ok(part) = CString::new(part) else {
1286        return std::ptr::null();
1287    };
1288
1289    part.into_raw()
1290}
1291
1292#[expect(unsafe_code)]
1293#[unsafe(no_mangle)]
1294pub unsafe extern "C" fn _rr_free_string(str: *mut c_char) {
1295    if str.is_null() {
1296        return;
1297    }
1298
1299    // Free the string:
1300    unsafe {
1301        // SAFETY: `_rr_free_string` should only be called on strings allocated by `_rr_escape_entity_path_part`.
1302        std::mem::drop(CString::from_raw(str));
1303    }
1304}