Skip to main content

re_chunk/
latest_at.rs

1use arrow::array::Array as _;
2use re_byte_size::SizeBytes;
3use re_log_types::{TimeInt, TimelineName};
4use re_types_core::ComponentIdentifier;
5
6use crate::{Chunk, RowId};
7
8// ---
9
10/// A query at a given time, for a given timeline.
11///
12/// Get the latest version of the data available at this time.
13#[derive(Clone, PartialEq, Eq, Hash)]
14pub struct LatestAtQuery {
15    timeline: TimelineName,
16    at: TimeInt,
17}
18
19impl SizeBytes for LatestAtQuery {
20    fn heap_size_bytes(&self) -> u64 {
21        let Self { timeline, at } = self;
22
23        timeline.heap_size_bytes() + at.heap_size_bytes()
24    }
25}
26
27impl std::fmt::Debug for LatestAtQuery {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.write_fmt(format_args!(
30            "<latest-at {:?} on {:?}>",
31            self.at, self.timeline,
32        ))
33    }
34}
35
36impl LatestAtQuery {
37    /// The returned query is guaranteed to never include [`TimeInt::STATIC`].
38    #[inline]
39    pub fn new(timeline: TimelineName, at: impl TryInto<TimeInt>) -> Self {
40        Self {
41            timeline,
42            at: TimeInt::saturated_temporal(at),
43        }
44    }
45
46    #[inline]
47    pub const fn latest(timeline: TimelineName) -> Self {
48        Self {
49            timeline,
50            at: TimeInt::MAX,
51        }
52    }
53
54    #[inline]
55    pub fn timeline(&self) -> TimelineName {
56        self.timeline
57    }
58
59    #[inline]
60    pub fn at(&self) -> TimeInt {
61        self.at
62    }
63}
64
65// ---
66
67impl Chunk {
68    /// Runs a [`LatestAtQuery`] filter on a [`Chunk`].
69    ///
70    /// This behaves as a row-based filter: the result is a new [`Chunk`] that is vertically
71    /// sliced to only contain the row relevant for the specified `query`.
72    ///
73    /// The resulting [`Chunk`] is guaranteed to contain all the same columns has the queried
74    /// chunk: there is no horizontal slicing going on.
75    ///
76    /// An empty [`Chunk`] (i.e. 0 rows, but N columns) is returned if the `query` yields nothing.
77    ///
78    /// Because the resulting chunk doesn't discard any column information, you can find extra relevant
79    /// information by inspecting the data, for examples timestamps on other timelines.
80    /// See [`Self::timeline_sliced`] and [`Self::component_sliced`] if you do want to filter this
81    /// extra data.
82    pub fn latest_at(&self, query: &LatestAtQuery, component: ComponentIdentifier) -> Self {
83        if self.is_empty() {
84            return self.clone();
85        }
86
87        re_tracing::profile_function!(format!("{query:?}"));
88
89        let Some(component_list_array) = self.components.get_array(component) else {
90            return self.emptied();
91        };
92
93        let mut index = None;
94
95        let is_static = self.is_static();
96        let is_sorted_by_row_id = self.is_sorted();
97
98        if is_static {
99            if is_sorted_by_row_id {
100                // Static, row-sorted chunk
101
102                for i in (0..self.num_rows()).rev() {
103                    if !component_list_array.is_valid(i) {
104                        continue;
105                    }
106
107                    index = Some(i);
108                    break;
109                }
110            } else {
111                // Static, row-unsorted chunk
112
113                let mut closest_row_id = RowId::ZERO;
114
115                for (i, row_id) in self.row_ids().enumerate() {
116                    if !component_list_array.is_valid(i) {
117                        continue;
118                    }
119
120                    let is_closer_row_id = row_id > closest_row_id;
121
122                    if is_closer_row_id {
123                        closest_row_id = row_id;
124                        index = Some(i);
125                    }
126                }
127            }
128        } else {
129            let Some(time_column) = self.timelines.get(&query.timeline()) else {
130                return self.emptied();
131            };
132
133            let is_sorted_by_time = time_column.is_sorted();
134            let times = time_column.times_raw();
135
136            if is_sorted_by_time {
137                // Temporal, row-sorted, time-sorted chunk
138
139                let i = times
140                    .partition_point(|&time| time <= query.at().as_i64())
141                    .saturating_sub(1);
142
143                for i in (0..=i).rev() {
144                    if !component_list_array.is_valid(i) {
145                        continue;
146                    }
147
148                    index = Some(i);
149                    break;
150                }
151            } else {
152                // Temporal, unsorted chunk
153
154                let mut closest_data_time = TimeInt::MIN;
155                let mut closest_row_id = RowId::ZERO;
156
157                for (i, row_id) in self.row_ids().enumerate() {
158                    if !component_list_array.is_valid(i) {
159                        continue;
160                    }
161
162                    let data_time = TimeInt::new_temporal(times[i]);
163
164                    let is_closer_time = data_time > closest_data_time && data_time <= query.at();
165                    let is_same_time_but_closer_row_id =
166                        data_time == closest_data_time && row_id > closest_row_id;
167
168                    if is_closer_time || is_same_time_but_closer_row_id {
169                        closest_data_time = data_time;
170                        closest_row_id = row_id;
171                        index = Some(i);
172                    }
173                }
174            }
175        }
176
177        index.map_or_else(|| self.emptied(), |i| self.row_sliced_shallow(i, 1))
178    }
179}