schiebung_rerun/lib.rs
1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3
4use std::collections::HashMap;
5use std::sync::Mutex;
6
7use rerun::{RecordingStream, TimeColumn};
8use schiebung::{BufferObserver, TransformType, TransformUpdate};
9
10/// Entity path that carries every dynamic transform. Matches ROS / rerun
11/// 0.32+ convention.
12const DYNAMIC_ENTITY_PATH: &str = "tf";
13/// Entity path that carries every static transform. Matches ROS / rerun
14/// 0.32+ convention.
15const STATIC_ENTITY_PATH: &str = "tf_static";
16
17/// Observer that logs transforms to a Rerun recording stream.
18///
19/// Every `on_update` call from the buffer turns into at most two
20/// `send_columns` calls: one columnar batch of all dynamic rows under the
21/// `tf` entity on the configured timeline, and one batch of all static rows
22/// under the `tf_static` entity with no time index. The parent/child relationship
23/// of each edge is carried in the `parent_frame` / `child_frame` data columns
24/// of rerun's `Transform3D` archetype (named frames) rather than in the
25/// entity-path string, so collapsing to two fixed paths does not change the
26/// transform graph rerun builds for the 3D view — only the entity-panel
27/// tree no longer breaks transforms out per edge.
28///
29/// Static transforms use rerun's static-logging semantics ("latest write
30/// wins per entity"), so the observer keeps an internal snapshot of every
31/// static frame it has ever seen and re-sends the full set whenever any
32/// static-touching batch arrives. Otherwise a delta with one new static
33/// would overwrite all previously logged statics on the collapsed entity.
34///
35/// If the model (e.g. a URDF) is loaded via rerun the `publish_static_transforms`
36/// flag should be set to false; otherwise the static transforms will be logged
37/// twice.
38///
39/// # Example
40///
41/// ```no_run
42/// use rerun::RecordingStreamBuilder;
43/// use schiebung::BufferTree;
44/// use schiebung_rerun::RerunObserver;
45///
46/// let rec = RecordingStreamBuilder::new("my_app").spawn()?;
47/// let mut buffer = BufferTree::new();
48///
49/// // Every subsequent buffer.update(&[...]) is bulk-logged to Rerun on
50/// // the "stable_time" timeline.
51/// buffer.register_observer(Box::new(RerunObserver::new(
52/// rec,
53/// /* publish_static_transforms = */ true,
54/// "stable_time".to_string(),
55/// )));
56/// # Ok::<(), Box<dyn std::error::Error>>(())
57/// ```
58pub struct RerunObserver {
59 rec: RecordingStream,
60 publish_static_transforms: bool,
61 timeline: String,
62 /// Every static transform ever seen, keyed by (parent, child). Re-sent
63 /// in full on every static-touching `on_update` because rerun-static
64 /// replaces all component values on the entity on each write.
65 static_state: Mutex<HashMap<(String, String), Row>>,
66}
67
68impl RerunObserver {
69 /// Build a new `RerunObserver`.
70 ///
71 /// - `rec` is the destination [`RecordingStream`] (typically created via
72 /// [`rerun::RecordingStreamBuilder`]).
73 /// - `publish_static_transforms` controls whether static edges are forwarded
74 /// to Rerun. Set to `false` when another producer already populates Rerun's
75 /// transform tree with the same static frames — for example when calling
76 /// [`rerun::RecordingStream::log_file_from_path`] on a URDF, which loads
77 /// every link's static offset itself. Forwarding the same static edges
78 /// from the buffer in that case results in duplicated frames.
79 /// - `timeline` is the name of the Rerun timeline to attach dynamic
80 /// transform timestamps to (e.g. `"stable_time"`). Static transforms
81 /// ignore this and are sent without a time index.
82 pub fn new(rec: RecordingStream, publish_static_transforms: bool, timeline: String) -> Self {
83 RerunObserver {
84 rec,
85 publish_static_transforms,
86 timeline,
87 static_state: Mutex::new(HashMap::new()),
88 }
89 }
90}
91
92/// Single row of columnar data we collect before sending. Strings are owned
93/// so rows can be cached in the static-state snapshot across `on_update`
94/// calls.
95#[derive(Clone)]
96struct Row {
97 parent: String,
98 child: String,
99 translation: [f32; 3],
100 quaternion: [f32; 4],
101 /// Stamp in nanoseconds since unix epoch. Unused for static entries.
102 stamp_ns: i64,
103}
104
105fn row_from(update: &TransformUpdate) -> Row {
106 let t = update.stamped_isometry.translation();
107 let r = update.stamped_isometry.rotation();
108 Row {
109 parent: update.from.clone(),
110 child: update.to.clone(),
111 translation: [t[0] as f32, t[1] as f32, t[2] as f32],
112 quaternion: [r[0] as f32, r[1] as f32, r[2] as f32, r[3] as f32],
113 stamp_ns: update.stamped_isometry.stamp(),
114 }
115}
116
117impl BufferObserver for RerunObserver {
118 fn on_update(&self, updates: &[TransformUpdate]) {
119 let mut dynamic_rows: Vec<Row> = Vec::new();
120 let mut static_updates: Vec<Row> = Vec::new();
121
122 for update in updates {
123 match update.kind {
124 TransformType::Dynamic => dynamic_rows.push(row_from(update)),
125 TransformType::Static => {
126 if self.publish_static_transforms {
127 static_updates.push(row_from(update));
128 }
129 }
130 }
131 }
132
133 if !dynamic_rows.is_empty() {
134 self.send_dynamic(DYNAMIC_ENTITY_PATH, &dynamic_rows);
135 }
136
137 if !static_updates.is_empty() {
138 // Merge deltas into state and snapshot under a single lock, then
139 // release before calling rerun so the batcher never blocks on us.
140 let snapshot: Vec<Row> = {
141 let mut state = self.static_state.lock().unwrap();
142 for row in static_updates {
143 state.insert((row.parent.clone(), row.child.clone()), row);
144 }
145 state.values().cloned().collect()
146 };
147 self.send_static(STATIC_ENTITY_PATH, &snapshot);
148 }
149 }
150}
151
152impl RerunObserver {
153 fn send_dynamic(&self, entity_path: &str, rows: &[Row]) {
154 let stamps: Vec<i64> = rows.iter().map(|r| r.stamp_ns).collect();
155 let time_column =
156 TimeColumn::new_timestamp_nanos_since_epoch(self.timeline.as_str(), stamps);
157
158 if let Some((tf_columns, frame_columns)) = build_columns(rows) {
159 self.rec
160 .send_columns(
161 entity_path.to_owned(),
162 [time_column],
163 tf_columns.chain(frame_columns),
164 )
165 .ok();
166 }
167 }
168
169 fn send_static(&self, entity_path: &str, rows: &[Row]) {
170 // Static transforms have no time index — pass an empty timeline list.
171 if let Some((tf_columns, frame_columns)) = build_columns(rows) {
172 self.rec
173 .send_columns(
174 entity_path.to_owned(),
175 std::iter::empty::<TimeColumn>(),
176 tf_columns.chain(frame_columns),
177 )
178 .ok();
179 }
180 }
181}
182
183/// Build the Transform3D and CoordinateFrame component columns for a batch
184/// of rows. Returns `None` if rerun's columnar serialization fails for
185/// either archetype (logged via `re_log` inside rerun).
186#[allow(clippy::type_complexity)]
187fn build_columns(
188 rows: &[Row],
189) -> Option<(
190 impl Iterator<Item = rerun::SerializedComponentColumn>,
191 impl Iterator<Item = rerun::SerializedComponentColumn>,
192)> {
193 let translations: Vec<[f32; 3]> = rows.iter().map(|r| r.translation).collect();
194 let quaternions: Vec<rerun::Quaternion> = rows
195 .iter()
196 .map(|r| rerun::Quaternion::from_xyzw(r.quaternion))
197 .collect();
198 let parents: Vec<String> = rows.iter().map(|r| r.parent.clone()).collect();
199 let children: Vec<String> = rows.iter().map(|r| r.child.clone()).collect();
200
201 let tf_columns = rerun::archetypes::Transform3D::update_fields()
202 .with_many_translation(translations)
203 .with_many_quaternion(quaternions)
204 .with_many_parent_frame(parents.clone())
205 .with_many_child_frame(children.clone())
206 .columns_of_unit_batches()
207 .ok()?;
208
209 let frame_columns = rerun::archetypes::CoordinateFrame::update_fields()
210 .with_many_frame(children)
211 .columns_of_unit_batches()
212 .ok()?;
213
214 Some((tf_columns, frame_columns))
215}