1#![crate_type = "staticlib"]
6#![expect(clippy::missing_safety_doc, clippy::undocumented_unsafe_blocks)] mod arrow_utils;
9mod component_type_registry;
10mod error;
11mod ptr;
12mod recording_streams;
13mod video;
14
15use std::ffi::{CString, c_char, c_float, c_uchar};
16use std::time::Duration;
17
18use arrow::array::{ArrayRef as ArrowArrayRef, ListArray as ArrowListArray};
19use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
20use arrow_utils::arrow_array_from_c_ffi;
21use component_type_registry::COMPONENT_TYPES;
22use re_arrow_util::ArrowArrayDowncastRef as _;
23use re_sdk::external::nohash_hasher::IntMap;
24use re_sdk::external::re_log_types::TimelineName;
25use re_sdk::log::{Chunk, ChunkId, PendingRow, TimeColumn};
26use re_sdk::time::TimeType;
27use re_sdk::{
28 ComponentDescriptor, EntityPath, RecordingStream, RecordingStreamBuilder, StoreKind, TimeCell,
29 TimePoint, Timeline,
30};
31use recording_streams::{RECORDING_STREAMS, recording_stream};
32
33#[repr(C)]
40#[derive(Debug, Copy, Clone)]
41pub struct CStringView {
42 pub string: *const c_char,
43 pub length: u32,
44}
45
46impl CStringView {
47 #[expect(clippy::result_large_err)]
51 pub fn as_maybe_empty_str<'a>(&'a self, argument_name: &'a str) -> Result<&'a str, CError> {
52 if self.is_empty() {
53 Ok("")
54 } else {
55 re_log::debug_assert!(
56 1000 < self.string.addr() && self.length < 1_000_000,
57 "Suspected memory corruption when reading argument {argument_name:?}: {self:#?}"
58 );
59 ptr::try_char_ptr_as_str(self.string, self.length, argument_name)
60 }
61 }
62
63 #[expect(clippy::result_large_err)]
65 pub fn as_optional_str<'a>(
66 &'a self,
67 argument_name: &'a str,
68 ) -> Result<Option<&'a str>, CError> {
69 if self.is_empty() {
70 Ok(None)
71 } else {
72 self.as_nonempty_str(argument_name).map(Some)
73 }
74 }
75
76 #[expect(clippy::result_large_err)]
78 pub fn as_nonempty_str<'a>(&'a self, argument_name: &'a str) -> Result<&'a str, CError> {
79 if self.is_empty() {
80 Err(CError::new(
81 CErrorCode::InvalidStringArgument,
82 &format!("{argument_name:?} was an empty string"),
83 ))
84 } else {
85 self.as_maybe_empty_str(argument_name)
86 }
87 }
88
89 pub fn is_empty(&self) -> bool {
93 self.length == 0
94 }
95}
96
97#[repr(C)]
99#[derive(Debug, Copy, Clone)]
100pub struct CBytesView {
101 pub bytes: *const c_uchar,
102 pub length: u32,
103}
104
105impl CBytesView {
106 #[expect(clippy::result_large_err)]
107 pub fn as_bytes<'a>(&self, argument_name: &'a str) -> Result<&'a [u8], CError> {
108 ptr::try_ptr_as_slice(self.bytes, self.length, argument_name)
109 }
110
111 pub fn is_null(&self) -> bool {
112 self.bytes.is_null()
113 }
114
115 pub fn is_empty(&self) -> bool {
116 self.length == 0
117 }
118}
119
120pub type CRecordingStream = u32;
121
122pub type CComponentTypeHandle = u32;
123
124pub const RR_REC_STREAM_CURRENT_RECORDING: CRecordingStream = 0xFFFFFFFF;
125pub const RR_REC_STREAM_CURRENT_BLUEPRINT: CRecordingStream = 0xFFFFFFFE;
126pub const RR_COMPONENT_TYPE_HANDLE_INVALID: CComponentTypeHandle = 0xFFFFFFFF;
127
128#[derive(Debug, Clone)]
132#[repr(C)]
133pub struct CSpawnOptions {
134 pub port: u16,
135 pub memory_limit: CStringView,
136 pub server_memory_limit: CStringView,
137 pub hide_welcome_screen: bool,
138 pub detach_process: bool,
139 pub executable_name: CStringView,
140 pub executable_path: CStringView,
141}
142
143impl CSpawnOptions {
144 #[expect(clippy::result_large_err)]
145 pub fn as_rust(&self) -> Result<re_sdk::SpawnOptions, CError> {
146 let Self {
147 port,
148 memory_limit,
149 server_memory_limit,
150 hide_welcome_screen,
151 detach_process,
152 executable_name,
153 executable_path,
154 } = self;
155
156 let mut spawn_opts = re_sdk::SpawnOptions::default();
157
158 if *port != 0 {
159 spawn_opts.port = *port;
160 }
161
162 spawn_opts.wait_for_bind = true;
163
164 if let Some(memory_limit) = memory_limit.as_optional_str("memory_limit")? {
165 spawn_opts.memory_limit = memory_limit.to_owned();
166 }
167 if let Some(server_memory_limit) =
168 server_memory_limit.as_optional_str("server_memory_limit")?
169 {
170 spawn_opts.server_memory_limit = server_memory_limit.to_owned();
171 }
172
173 spawn_opts.hide_welcome_screen = *hide_welcome_screen;
174 spawn_opts.detach_process = *detach_process;
175
176 if let Some(executable_name) = executable_name.as_optional_str("executable_name")? {
177 spawn_opts.executable_name = executable_name.to_owned();
178 }
179
180 if let Some(executable_path) = executable_path.as_optional_str("executable_path")? {
181 spawn_opts.executable_path = Some(executable_path.to_owned());
182 }
183
184 Ok(spawn_opts)
185 }
186}
187
188#[repr(u32)]
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum CStoreKind {
191 Recording = 1,
193
194 Blueprint = 2,
196}
197
198impl From<CStoreKind> for StoreKind {
199 fn from(kind: CStoreKind) -> Self {
200 match kind {
201 CStoreKind::Recording => Self::Recording,
202 CStoreKind::Blueprint => Self::Blueprint,
203 }
204 }
205}
206
207#[repr(C)]
209#[derive(Debug)]
210pub struct CStoreInfo {
211 pub application_id: CStringView,
213
214 pub recording_id: CStringView,
218
219 pub store_kind: CStoreKind,
220}
221
222#[repr(C)]
224pub struct CComponentDescriptor {
225 pub archetype_name: CStringView,
226 pub component: CStringView,
227 pub component_type: CStringView,
228}
229
230#[repr(C)]
232pub struct CComponentType {
233 pub descriptor: CComponentDescriptor,
234 pub schema: FFI_ArrowSchema,
235}
236
237#[repr(C)]
239pub struct CComponentBatch {
240 pub component_type: CComponentTypeHandle,
241 pub array: FFI_ArrowArray,
242}
243
244#[repr(C)]
245pub struct CDataRow {
246 pub entity_path: CStringView,
247 pub num_data_cells: u32,
248 pub batches: *mut CComponentBatch,
249}
250
251#[repr(C)]
253pub struct CComponentColumns {
254 pub component_type: CComponentTypeHandle,
255
256 pub array: FFI_ArrowArray,
258}
259
260#[repr(u32)]
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum CSortingStatus {
264 Unknown = 0,
265 Sorted = 1,
266 Unsorted = 2,
267}
268
269impl CSortingStatus {
270 fn is_sorted(&self) -> Option<bool> {
271 match self {
272 Self::Sorted => Some(true),
273 Self::Unsorted => Some(false),
274 Self::Unknown => None,
275 }
276 }
277}
278
279#[repr(u32)]
282#[derive(Debug, Clone, Copy)]
283pub enum CTimeType {
284 Sequence = 1,
286
287 Duration = 2,
289
290 Timestamp = 3,
292}
293
294#[repr(C)]
297#[derive(Debug, Clone)]
298pub struct CTimeline {
299 pub name: CStringView,
301
302 pub typ: CTimeType,
304}
305
306impl TryFrom<CTimeline> for Timeline {
307 type Error = CError;
308
309 fn try_from(timeline: CTimeline) -> Result<Self, CError> {
310 let name = timeline.name.as_nonempty_str("timeline.name")?;
311 let typ = match timeline.typ {
312 CTimeType::Sequence => TimeType::Sequence,
313 CTimeType::Duration => TimeType::DurationNs,
314 CTimeType::Timestamp => TimeType::TimestampNs,
315 };
316 Ok(Self::new(name, typ))
317 }
318}
319
320#[repr(C)]
323pub struct CTimeColumn {
324 pub timeline: CTimeline,
325
326 pub times: FFI_ArrowArray,
328
329 pub sorting_status: CSortingStatus,
331}
332
333#[derive(Debug)]
339#[repr(C)]
340pub struct CGrpcSink {
341 pub url: CStringView,
345}
346
347#[derive(Debug)]
351#[repr(C)]
352pub struct CFileSink {
353 pub path: CStringView,
355}
356
357#[derive(Debug)]
367#[repr(C, u8)]
368pub enum CLogSink {
369 GrpcSink { grpc: CGrpcSink } = 0,
370 FileSink { file: CFileSink } = 1,
371}
372
373#[repr(u32)]
375#[derive(Debug, Clone, Copy, PartialEq, Eq)]
376pub enum CErrorCode {
377 Ok = 0,
378 OutOfMemory,
379 NotImplemented,
380 SdkVersionMismatch,
381
382 _CategoryArgument = 0x0000_00010,
384 UnexpectedNullArgument,
385 InvalidStringArgument,
386 InvalidEnumValue,
387 InvalidRecordingStreamHandle,
388 InvalidSocketAddress,
389 InvalidComponentTypeHandle,
390 InvalidTimeArgument,
391 InvalidTensorDimension,
392 InvalidComponent,
393 InvalidServerUrl = 0x0000_0001a,
394 FileRead,
395 InvalidMemoryLimit,
396
397 _CategoryRecordingStream = 0x0000_00100,
399 RecordingStreamRuntimeFailure,
400 RecordingStreamCreationFailure,
401 RecordingStreamSaveFailure,
402 RecordingStreamStdoutFailure,
403 RecordingStreamSpawnFailure,
404 RecordingStreamChunkValidationFailure,
405 RecordingStreamServeGrpcFailure,
406 RecordingStreamFlushTimeout,
407 RecordingStreamFlushFailure,
408
409 _CategoryArrow = 0x0000_1000,
411 ArrowFfiSchemaImportError,
412 ArrowFfiArrayImportError,
413
414 _CategoryUtilities = 0x0001_0000,
416 VideoLoadError,
417
418 _CategoryFileIO = 0x0010_0000,
420 FileOpenFailure,
421
422 _CategoryArrowCppStatus = 0x1000_0000,
424
425 Unknown = 0xFFFF_FFFF,
426}
427
428#[repr(C)]
429#[derive(Clone)]
430pub struct CError {
431 pub code: CErrorCode,
432 pub message: [c_char; Self::MAX_MESSAGE_SIZE_BYTES],
433}
434
435#[expect(unsafe_code)]
441#[unsafe(no_mangle)]
442pub extern "C" fn rr_version_string() -> *const c_char {
443 static VERSION: std::sync::LazyLock<CString> = std::sync::LazyLock::new(|| {
444 CString::new(re_sdk::build_info().version.to_string()).expect("CString::new failed")
445 }); VERSION.as_ptr()
448}
449
450#[expect(clippy::result_large_err)]
451fn rr_spawn_impl(spawn_opts: *const CSpawnOptions) -> Result<(), CError> {
452 let spawn_opts = if spawn_opts.is_null() {
453 re_sdk::SpawnOptions::default()
454 } else {
455 let spawn_opts = ptr::try_ptr_as_ref(spawn_opts, "spawn_opts")?;
456 spawn_opts.as_rust()?
457 };
458
459 re_sdk::spawn(&spawn_opts)
462 .map(drop)
463 .map_err(|err| CError::new(CErrorCode::RecordingStreamSpawnFailure, &err.to_string()))?;
464
465 Ok(())
466}
467
468#[expect(unsafe_code)]
469#[unsafe(no_mangle)]
470pub extern "C" fn rr_spawn(spawn_opts: *const CSpawnOptions, error: *mut CError) {
471 if let Err(err) = rr_spawn_impl(spawn_opts) {
472 err.write_error(error);
473 }
474}
475
476#[expect(clippy::result_large_err)]
477fn rr_register_component_type_impl(
478 component_type: &CComponentType,
479) -> Result<CComponentTypeHandle, CError> {
480 let CComponentDescriptor {
481 archetype_name,
482 component,
483 component_type: component_type_descr,
484 } = &component_type.descriptor;
485
486 let archetype_name =
487 archetype_name.as_optional_str("component_type.descriptor.archetype_name")?;
488
489 let component = component.as_nonempty_str("component_type.descriptor.component")?;
490
491 let component_type_descr =
492 component_type_descr.as_optional_str("component_type.descriptor.component_type")?;
493
494 let component_descr = ComponentDescriptor {
495 archetype: archetype_name.map(Into::into),
496 component: component.into(),
497 component_type: component_type_descr.map(Into::into),
498 };
499
500 let field = arrow::datatypes::Field::try_from(&component_type.schema).map_err(|err| {
501 CError::new(
502 CErrorCode::ArrowFfiSchemaImportError,
503 &format!("Failed to import ffi schema: {err}"),
504 )
505 })?;
506
507 Ok(COMPONENT_TYPES
508 .write()
509 .register(component_descr, field.data_type().clone()))
510}
511
512#[expect(unsafe_code)]
513#[unsafe(no_mangle)]
514pub extern "C" fn rr_register_component_type(
515 component_type: CComponentType,
517 error: *mut CError,
518) -> u32 {
519 match rr_register_component_type_impl(&component_type) {
520 Ok(id) => id,
521 Err(err) => {
522 err.write_error(error);
523 RR_COMPONENT_TYPE_HANDLE_INVALID
524 }
525 }
526}
527
528#[expect(clippy::result_large_err)]
529fn rr_recording_stream_new_impl(
530 store_info: *const CStoreInfo,
531 default_enabled: bool,
532) -> Result<CRecordingStream, CError> {
533 {
534 use std::sync::Once;
535 static INIT: Once = Once::new();
536 INIT.call_once(|| {
537 re_log::setup_logging();
538 if cfg!(debug_assertions) {
539 re_crash_handler::install_crash_handlers(re_build_info::build_info!());
540
541 const DEBUG_BUILD_WARNING: &str =
544 "Using a DEBUG BUILD of the Rerun SDK with Rerun crash handlers!";
545 let can_log_warning = std::env::var("RERUN_PANIC_ON_WARN")
546 .map(|value| value == "0")
547 .unwrap_or(true);
548 if can_log_warning {
549 re_log::warn!(DEBUG_BUILD_WARNING);
550 } else {
551 re_log::info!(DEBUG_BUILD_WARNING);
552 }
553 }
554 });
555 }
556
557 let store_info = ptr::try_ptr_as_ref(store_info, "store_info")?;
558
559 let CStoreInfo {
560 application_id,
561 recording_id,
562 store_kind,
563 } = *store_info;
564
565 let application_id = application_id.as_nonempty_str("store_info.application_id")?;
566
567 let mut rec_builder = RecordingStreamBuilder::new(application_id)
568 .store_source(re_sdk::external::re_log_types::StoreSource::CSdk)
570 .default_enabled(default_enabled);
571
572 if let Some(recording_id) = recording_id.as_optional_str("recording_id")? {
573 rec_builder = rec_builder.recording_id(recording_id);
574 }
575
576 if store_kind == CStoreKind::Blueprint {
577 rec_builder = rec_builder.blueprint();
578 }
579
580 let rec = rec_builder.buffered().map_err(|err| {
581 CError::new(
582 CErrorCode::RecordingStreamCreationFailure,
583 &format!("Failed to create recording stream: {err}"),
584 )
585 })?;
586 Ok(RECORDING_STREAMS.lock().insert(rec))
587}
588
589#[expect(unsafe_code)]
590#[unsafe(no_mangle)]
591pub extern "C" fn rr_recording_stream_new(
592 store_info: *const CStoreInfo,
593 default_enabled: bool,
594 error: *mut CError,
595) -> CRecordingStream {
596 match rr_recording_stream_new_impl(store_info, default_enabled) {
597 Err(err) => {
598 err.write_error(error);
599 0
600 }
601 Ok(id) => id,
602 }
603}
604
605struct TrivialTypeWithDrop;
607
608impl Drop for TrivialTypeWithDrop {
609 fn drop(&mut self) {
610 std::hint::black_box(self);
612 }
613}
614
615thread_local! {
616 pub static THREAD_LIFE_TRACKER: TrivialTypeWithDrop = const { TrivialTypeWithDrop };
636}
637
638#[expect(unsafe_code)]
639#[unsafe(no_mangle)]
640pub extern "C" fn rr_recording_stream_free(id: CRecordingStream) {
641 if THREAD_LIFE_TRACKER.try_with(|_v| {}).is_ok() {
642 if let Some(stream) = RECORDING_STREAMS.lock().remove(id) {
643 drop(stream);
647 }
648 } else {
649 re_log::debug!(
651 "rr_recording_stream_free called on a thread that is shutting down and can no longer access thread locals. We can't handle this and have to ignore this call."
652 );
653 }
654}
655
656#[expect(unsafe_code)]
657#[unsafe(no_mangle)]
658pub extern "C" fn rr_recording_stream_set_global(id: CRecordingStream, store_kind: CStoreKind) {
659 let stream = RECORDING_STREAMS.lock().get(id);
660 RecordingStream::set_global(store_kind.into(), stream);
661}
662
663#[expect(unsafe_code)]
664#[unsafe(no_mangle)]
665pub extern "C" fn rr_recording_stream_set_thread_local(
666 id: CRecordingStream,
667 store_kind: CStoreKind,
668) {
669 let stream = RECORDING_STREAMS.lock().get(id);
670 RecordingStream::set_thread_local(store_kind.into(), stream);
671}
672
673#[expect(unsafe_code)]
674#[unsafe(no_mangle)]
675pub extern "C" fn rr_recording_stream_is_enabled(
676 stream: CRecordingStream,
677 error: *mut CError,
678) -> bool {
679 match rr_recording_stream_is_enabled_impl(stream) {
680 Ok(enabled) => enabled,
681 Err(err) => {
682 err.write_error(error);
683 false
684 }
685 }
686}
687
688#[expect(clippy::result_large_err)]
689fn rr_recording_stream_is_enabled_impl(id: CRecordingStream) -> Result<bool, CError> {
690 Ok(recording_stream(id)?.is_enabled())
691}
692
693#[expect(unsafe_code)]
694#[unsafe(no_mangle)]
695pub unsafe extern "C" fn rr_recording_stream_flush_blocking(
696 id: CRecordingStream,
697 timeout_sec: c_float,
698 error: *mut CError,
699) {
700 if let Some(stream) = RECORDING_STREAMS.lock().get(id) {
701 let timeout = if timeout_sec.is_nan() {
702 if let Some(error) = unsafe { error.as_mut() } {
703 *error = CError::new(CErrorCode::InvalidTimeArgument, "NaN timeout");
704 }
705 Duration::ZERO
706 } else if timeout_sec < 0.0 {
707 if let Some(error) = unsafe { error.as_mut() } {
708 *error = CError::new(CErrorCode::InvalidTimeArgument, "Negative timeout");
709 }
710 Duration::ZERO
711 } else {
712 Duration::try_from_secs_f32(timeout_sec)
713 .ok()
714 .unwrap_or(Duration::MAX)
715 };
716 if let Err(err) = stream.flush_with_timeout(timeout)
717 && let Some(error) = unsafe { error.as_mut() }
718 {
719 let code = match &err {
720 re_sdk::sink::SinkFlushError::Timeout => CErrorCode::RecordingStreamFlushTimeout,
721 re_sdk::sink::SinkFlushError::Failed { .. } => {
722 CErrorCode::RecordingStreamFlushFailure
723 }
724 };
725 *error = CError::new(code, &err.to_string());
726 }
727 }
728}
729
730#[expect(unsafe_code)]
731#[expect(clippy::result_large_err)]
732fn rr_recording_stream_set_sinks_impl(
733 stream: CRecordingStream,
734 raw_sinks: *mut CLogSink,
735 num_sinks: u32,
736) -> Result<(), CError> {
737 let stream = recording_stream(stream)?;
738
739 let raw_sinks = unsafe { std::slice::from_raw_parts_mut(raw_sinks, num_sinks as usize) };
740
741 let mut sinks: Vec<Box<dyn re_sdk::sink::LogSink>> = Vec::with_capacity(num_sinks as usize);
742 for sink in raw_sinks {
743 match sink {
744 CLogSink::GrpcSink { grpc } => {
745 let uri = grpc
746 .url
747 .as_nonempty_str("url")?
748 .parse::<re_sdk::external::re_uri::ProxyUri>()
749 .map_err(|err| CError::new(CErrorCode::InvalidServerUrl, &err.to_string()))?;
750 sinks.push(Box::new(re_sdk::sink::GrpcSink::new(uri)));
751 }
752 CLogSink::FileSink { file } => {
753 let path = file.path.as_nonempty_str("path")?;
754 sinks.push(Box::new(re_sdk::sink::FileSink::new(path).map_err(
755 |err| {
756 CError::new(
757 CErrorCode::RecordingStreamSaveFailure,
758 &format!("Failed to save recording stream to {path:?}: {err}"),
759 )
760 },
761 )?));
762 }
763 }
764 }
765
766 stream.set_sinks(sinks);
767
768 Ok(())
769}
770
771#[expect(unsafe_code)]
772#[unsafe(no_mangle)]
773pub extern "C" fn rr_recording_stream_set_sinks(
774 id: CRecordingStream,
775 sinks: *mut CLogSink,
776 num_sinks: u32,
777 error: *mut CError,
778) {
779 if let Err(err) = rr_recording_stream_set_sinks_impl(id, sinks, num_sinks) {
780 err.write_error(error);
781 }
782}
783
784#[expect(clippy::result_large_err)]
785fn rr_recording_stream_connect_grpc_impl(
786 stream: CRecordingStream,
787 url: CStringView,
788) -> Result<(), CError> {
789 let stream = recording_stream(stream)?;
790
791 let url = url.as_nonempty_str("url")?;
792
793 if let Err(err) = stream.connect_grpc_opts(url) {
794 return Err(CError::new(CErrorCode::InvalidServerUrl, &err.to_string()));
795 }
796
797 Ok(())
798}
799
800#[expect(unsafe_code)]
801#[unsafe(no_mangle)]
802pub extern "C" fn rr_recording_stream_connect_grpc(
803 id: CRecordingStream,
804 url: CStringView,
805 error: *mut CError,
806) {
807 if let Err(err) = rr_recording_stream_connect_grpc_impl(id, url) {
808 err.write_error(error);
809 }
810}
811
812#[expect(clippy::result_large_err)]
813fn rr_recording_stream_serve_grpc_impl(
814 stream: CRecordingStream,
815 bind_ip: CStringView,
816 port: u16,
817 server_memory_limit: CStringView,
818 newest_first: bool,
819) -> Result<(), CError> {
820 let stream = recording_stream(stream)?;
821
822 let bind_ip = bind_ip.as_nonempty_str("bind_ip")?;
823 let server_options = re_sdk::ServerOptions {
824 playback_behavior: re_sdk::PlaybackBehavior::from_newest_first(newest_first),
825
826 memory_limit: server_memory_limit
827 .as_maybe_empty_str("server_memory_limit")?
828 .parse::<re_sdk::MemoryLimit>()
829 .map_err(|err| CError::new(CErrorCode::InvalidMemoryLimit, &err))?,
830 };
831
832 stream
833 .serve_grpc_opts(bind_ip, port, server_options)
834 .map_err(|err| {
835 CError::new(
836 CErrorCode::RecordingStreamServeGrpcFailure,
837 &err.to_string(),
838 )
839 })?;
840
841 Ok(())
842}
843
844#[expect(unsafe_code)]
845#[unsafe(no_mangle)]
846pub extern "C" fn rr_recording_stream_serve_grpc(
847 id: CRecordingStream,
848 bind_ip: CStringView,
849 port: u16,
850 server_memory_limit: CStringView,
851 newest_first: bool,
852 error: *mut CError,
853) {
854 if let Err(err) =
855 rr_recording_stream_serve_grpc_impl(id, bind_ip, port, server_memory_limit, newest_first)
856 {
857 err.write_error(error);
858 }
859}
860
861#[expect(clippy::result_large_err)]
862fn rr_recording_stream_spawn_impl(
863 stream: CRecordingStream,
864 spawn_opts: *const CSpawnOptions,
865) -> Result<(), CError> {
866 let stream = recording_stream(stream)?;
867
868 let spawn_opts = if spawn_opts.is_null() {
869 re_sdk::SpawnOptions::default()
870 } else {
871 let spawn_opts = ptr::try_ptr_as_ref(spawn_opts, "spawn_opts")?;
872 spawn_opts.as_rust()?
873 };
874
875 stream
876 .spawn_opts(&spawn_opts)
877 .map_err(|err| CError::new(CErrorCode::RecordingStreamSpawnFailure, &err.to_string()))?;
878
879 Ok(())
880}
881
882#[expect(unsafe_code)]
883#[unsafe(no_mangle)]
884pub extern "C" fn rr_recording_stream_spawn(
885 id: CRecordingStream,
886 spawn_opts: *const CSpawnOptions,
887 error: *mut CError,
888) {
889 if let Err(err) = rr_recording_stream_spawn_impl(id, spawn_opts) {
890 err.write_error(error);
891 }
892}
893
894#[expect(clippy::result_large_err)]
895fn rr_recording_stream_save_impl(
896 stream: CRecordingStream,
897 rrd_filepath: CStringView,
898) -> Result<(), CError> {
899 let rrd_filepath = rrd_filepath.as_nonempty_str("path")?;
900 recording_stream(stream)?.save(rrd_filepath).map_err(|err| {
901 CError::new(
902 CErrorCode::RecordingStreamSaveFailure,
903 &format!("Failed to save recording stream to {rrd_filepath:?}: {err}"),
904 )
905 })
906}
907
908#[expect(unsafe_code)]
909#[unsafe(no_mangle)]
910pub extern "C" fn rr_recording_stream_save(
911 id: CRecordingStream,
912 path: CStringView,
913 error: *mut CError,
914) {
915 if let Err(err) = rr_recording_stream_save_impl(id, path) {
916 err.write_error(error);
917 }
918}
919
920#[expect(clippy::result_large_err)]
921fn rr_recording_stream_stdout_impl(stream: CRecordingStream) -> Result<(), CError> {
922 recording_stream(stream)?.stdout().map_err(|err| {
923 CError::new(
924 CErrorCode::RecordingStreamStdoutFailure,
925 &format!("Failed to forward recording stream to stdout: {err}"),
926 )
927 })
928}
929
930#[expect(unsafe_code)]
931#[unsafe(no_mangle)]
932pub extern "C" fn rr_recording_stream_stdout(id: CRecordingStream, error: *mut CError) {
933 if let Err(err) = rr_recording_stream_stdout_impl(id) {
934 err.write_error(error);
935 }
936}
937
938#[expect(clippy::result_large_err)]
939fn rr_recording_stream_set_time_impl(
940 stream: CRecordingStream,
941 timeline_name: CStringView,
942 time_type: CTimeType,
943 value: i64,
944) -> Result<(), CError> {
945 let timeline = timeline_name.as_nonempty_str("timeline_name")?;
946 let stream = recording_stream(stream)?;
947 let time_type = match time_type {
948 CTimeType::Sequence => TimeType::Sequence,
949 CTimeType::Duration => TimeType::DurationNs,
950 CTimeType::Timestamp => TimeType::TimestampNs,
951 };
952 stream.set_time(timeline, TimeCell::new(time_type, value));
953 Ok(())
954}
955
956#[expect(unsafe_code)]
957#[unsafe(no_mangle)]
958pub extern "C" fn rr_recording_stream_set_time(
959 stream: CRecordingStream,
960 timeline_name: CStringView,
961 time_type: CTimeType,
962 value: i64,
963 error: *mut CError,
964) {
965 if let Err(err) = rr_recording_stream_set_time_impl(stream, timeline_name, time_type, value) {
966 err.write_error(error);
967 }
968}
969
970#[expect(clippy::result_large_err)]
971fn rr_recording_stream_disable_timeline_impl(
972 stream: CRecordingStream,
973 timeline_name: CStringView,
974) -> Result<(), CError> {
975 let timeline = timeline_name.as_nonempty_str("timeline_name")?;
976 recording_stream(stream)?.disable_timeline(timeline);
977 Ok(())
978}
979
980#[expect(unsafe_code)]
981#[unsafe(no_mangle)]
982pub extern "C" fn rr_recording_stream_disable_timeline(
983 stream: CRecordingStream,
984 timeline_name: CStringView,
985 error: *mut CError,
986) {
987 if let Err(err) = rr_recording_stream_disable_timeline_impl(stream, timeline_name) {
988 err.write_error(error);
989 }
990}
991
992#[expect(unsafe_code)]
993#[unsafe(no_mangle)]
994pub extern "C" fn rr_recording_stream_reset_time(stream: CRecordingStream) {
995 if let Some(stream) = RECORDING_STREAMS.lock().get(stream) {
996 stream.reset_time();
997 }
998}
999
1000#[expect(unsafe_code)]
1001#[expect(clippy::result_large_err)]
1002#[expect(clippy::needless_pass_by_value)] fn rr_recording_stream_log_impl(
1004 stream: CRecordingStream,
1005 data_row: CDataRow,
1006 inject_time: bool,
1007) -> Result<(), CError> {
1008 let row_id = re_sdk::log::RowId::new();
1011
1012 let stream = recording_stream(stream)?;
1013
1014 let CDataRow {
1015 entity_path,
1016 num_data_cells,
1017 batches,
1018 } = data_row;
1019
1020 let entity_path = entity_path.as_maybe_empty_str("entity_path")?;
1021 let entity_path = EntityPath::parse_forgiving(entity_path);
1022
1023 let num_data_cells = num_data_cells as usize;
1024
1025 let batches = unsafe { std::slice::from_raw_parts_mut(batches, num_data_cells) };
1026
1027 let mut components = IntMap::default();
1028 {
1029 let component_type_registry = COMPONENT_TYPES.read();
1030
1031 for batch in batches {
1032 let CComponentBatch {
1033 component_type,
1034 array,
1035 } = batch;
1036 let component_type = component_type_registry.get(*component_type)?;
1037 let datatype = component_type.datatype.clone();
1038 let array = unsafe { FFI_ArrowArray::from_raw(array) }; let values = unsafe { arrow_array_from_c_ffi(array, datatype) }?;
1040 let batch =
1041 re_sdk::SerializedComponentBatch::new(values, component_type.descriptor.clone());
1042 components.insert(batch.descriptor.component, batch);
1043 }
1044 }
1045
1046 let row = PendingRow {
1047 row_id,
1048 timepoint: TimePoint::default(), components,
1050 };
1051
1052 stream.record_row(entity_path, row, inject_time);
1053
1054 Ok(())
1055}
1056
1057#[expect(unsafe_code)]
1058#[unsafe(no_mangle)]
1059pub unsafe extern "C" fn rr_recording_stream_log(
1060 stream: CRecordingStream,
1061 data_row: CDataRow,
1062 inject_time: bool,
1063 error: *mut CError,
1064) {
1065 if let Err(err) = rr_recording_stream_log_impl(stream, data_row, inject_time) {
1066 err.write_error(error);
1067 }
1068}
1069
1070#[expect(clippy::result_large_err)]
1071fn rr_recording_stream_log_file_from_path_impl(
1072 stream: CRecordingStream,
1073 filepath: CStringView,
1074 entity_path_prefix: CStringView,
1075 static_: bool,
1076) -> Result<(), CError> {
1077 let stream = recording_stream(stream)?;
1078
1079 let filepath = filepath.as_nonempty_str("filepath")?;
1080 let entity_path_prefix = entity_path_prefix.as_optional_str("entity_path_prefix")?;
1081
1082 stream
1083 .log_file_from_path(filepath, entity_path_prefix.map(Into::into), static_)
1084 .map_err(|err| {
1085 CError::new(
1086 CErrorCode::RecordingStreamRuntimeFailure,
1087 &format!("Couldn't load file {filepath:?}: {err}"),
1088 )
1089 })?;
1090
1091 Ok(())
1092}
1093
1094#[expect(unsafe_code)]
1095#[unsafe(no_mangle)]
1096pub unsafe extern "C" fn rr_recording_stream_log_file_from_path(
1097 stream: CRecordingStream,
1098 filepath: CStringView,
1099 entity_path_prefix: CStringView,
1100 static_: bool,
1101 error: *mut CError,
1102) {
1103 if let Err(err) =
1104 rr_recording_stream_log_file_from_path_impl(stream, filepath, entity_path_prefix, static_)
1105 {
1106 err.write_error(error);
1107 }
1108}
1109
1110#[expect(clippy::result_large_err)]
1111fn rr_recording_stream_log_file_from_contents_impl(
1112 stream: CRecordingStream,
1113 filepath: CStringView,
1114 contents: CBytesView,
1115 entity_path_prefix: CStringView,
1116 static_: bool,
1117) -> Result<(), CError> {
1118 let stream = recording_stream(stream)?;
1119
1120 let filepath = filepath.as_nonempty_str("filepath")?;
1121 let contents = contents.as_bytes("contents")?;
1122 let entity_path_prefix = entity_path_prefix.as_optional_str("entity_path_prefix")?;
1123
1124 stream
1125 .log_file_from_contents(
1126 filepath,
1127 std::borrow::Cow::Borrowed(contents),
1128 entity_path_prefix.map(Into::into),
1129 static_,
1130 )
1131 .map_err(|err| {
1132 CError::new(
1133 CErrorCode::RecordingStreamRuntimeFailure,
1134 &format!("Couldn't load file {filepath:?}: {err}"),
1135 )
1136 })?;
1137
1138 Ok(())
1139}
1140
1141#[expect(unsafe_code)]
1142#[unsafe(no_mangle)]
1143pub unsafe extern "C" fn rr_recording_stream_log_file_from_contents(
1144 stream: CRecordingStream,
1145 filepath: CStringView,
1146 contents: CBytesView,
1147 entity_path_prefix: CStringView,
1148 static_: bool,
1149 error: *mut CError,
1150) {
1151 if let Err(err) = rr_recording_stream_log_file_from_contents_impl(
1152 stream,
1153 filepath,
1154 contents,
1155 entity_path_prefix,
1156 static_,
1157 ) {
1158 err.write_error(error);
1159 }
1160}
1161
1162#[expect(unsafe_code)]
1163#[expect(clippy::result_large_err)]
1164fn rr_recording_stream_send_columns_impl(
1165 stream: CRecordingStream,
1166 entity_path: CStringView,
1167 time_columns: &mut [CTimeColumn],
1168 component_columns: &mut [CComponentColumns],
1169) -> Result<(), CError> {
1170 let id = ChunkId::new();
1172
1173 let stream = recording_stream(stream)?;
1174 let entity_path = entity_path.as_maybe_empty_str("entity_path")?;
1175
1176 let time_columns: IntMap<TimelineName, TimeColumn> = time_columns
1177 .iter_mut()
1178 .map(|time_column| {
1179 let timeline: Timeline = time_column.timeline.clone().try_into()?;
1180 let datatype = arrow::datatypes::DataType::Int64;
1181 let array = unsafe { FFI_ArrowArray::from_raw(&mut time_column.times) } ; let time_values_untyped = unsafe { arrow_array_from_c_ffi(array, datatype) }?;
1183 let time_values = TimeColumn::read_array(&ArrowArrayRef::from(time_values_untyped)).map_err(|err| {
1184 CError::new(
1185 CErrorCode::ArrowFfiArrayImportError,
1186 &format!("Arrow C FFI import did not produce a Int64 time array - please file an issue at https://github.com/rerun-io/rerun/issues if you see this! This shouldn't be possible since conversion from C was successful with this datatype. Details: {err}")
1187 )
1188 })?;
1189
1190 Ok((
1191 *timeline.name(),
1192 TimeColumn::new(
1193 time_column.sorting_status.is_sorted(),
1194 timeline,
1195 time_values.clone(),
1196 ),
1197 ))
1198 })
1199 .collect::<Result<_, CError>>()?;
1200
1201 let components: IntMap<ComponentDescriptor, ArrowListArray> = {
1202 let component_type_registry = COMPONENT_TYPES.read();
1203 component_columns
1204 .iter_mut()
1205 .map(|batch| {
1206 let CComponentColumns {
1207 component_type,
1208 array,
1209 } = batch;
1210 let component_type = component_type_registry.get(*component_type)?;
1211
1212 let nullable = true;
1213 let list_datatype = arrow::datatypes::DataType::List(arrow::datatypes::Field::new_list_field(component_type.datatype.clone(), nullable).into());
1214
1215 let array = unsafe { FFI_ArrowArray::from_raw(array) }; let component_values_untyped = unsafe { arrow_array_from_c_ffi(array, list_datatype) }?;
1217 let component_values = component_values_untyped
1218 .downcast_array_ref::<ArrowListArray>()
1219 .ok_or_else(|| {
1220 CError::new(
1221 CErrorCode::ArrowFfiArrayImportError,
1222 "Arrow C FFI import did not produce a ListArray - please file an issue at https://github.com/rerun-io/rerun/issues if you see this! This shouldn't be possible since conversion from C was successful with this datatype.",
1223 )
1224 })?;
1225
1226 Ok((component_type.descriptor.clone(), component_values.clone()))
1227 })
1228 .collect::<Result<_, CError>>()?
1229 };
1230
1231 let chunk = Chunk::from_auto_row_ids(
1232 id,
1233 entity_path.into(),
1234 time_columns,
1235 components.into_iter().collect(),
1236 )
1237 .map_err(|err| {
1238 CError::new(
1239 CErrorCode::RecordingStreamChunkValidationFailure,
1240 &format!("Failed to create chunk: {err}"),
1241 )
1242 })?;
1243
1244 stream.send_chunk(chunk);
1245
1246 Ok(())
1247}
1248
1249#[expect(unsafe_code)]
1250#[unsafe(no_mangle)]
1251pub unsafe extern "C" fn rr_recording_stream_send_columns(
1252 stream: CRecordingStream,
1253 entity_path: CStringView,
1254 time_columns: *mut CTimeColumn,
1255 num_time_columns: u32,
1256 component_batches: *mut CComponentColumns,
1257 num_component_batches: u32,
1258 error: *mut CError,
1259) {
1260 let time_columns =
1261 unsafe { std::slice::from_raw_parts_mut(time_columns, num_time_columns as usize) };
1262 let component_batches = unsafe {
1263 std::slice::from_raw_parts_mut(component_batches, num_component_batches as usize)
1264 };
1265
1266 if let Err(err) =
1267 rr_recording_stream_send_columns_impl(stream, entity_path, time_columns, component_batches)
1268 {
1269 err.write_error(error);
1270 }
1271}
1272
1273#[expect(unsafe_code)]
1277#[unsafe(no_mangle)]
1278pub unsafe extern "C" fn _rr_escape_entity_path_part(part: CStringView) -> *const c_char {
1279 let Ok(part) = part.as_maybe_empty_str("entity_path_part") else {
1280 return std::ptr::null();
1281 };
1282
1283 let part = re_sdk::EntityPathPart::from(part).escaped_string();
1284
1285 let Ok(part) = CString::new(part) else {
1286 return std::ptr::null();
1287 };
1288
1289 part.into_raw()
1290}
1291
1292#[expect(unsafe_code)]
1293#[unsafe(no_mangle)]
1294pub unsafe extern "C" fn _rr_free_string(str: *mut c_char) {
1295 if str.is_null() {
1296 return;
1297 }
1298
1299 unsafe {
1301 std::mem::drop(CString::from_raw(str));
1303 }
1304}