re_chunk/
latest_at.rs

1use arrow::array::Array as _;
2
3use re_log_types::{TimeInt, TimelineName};
4use re_types_core::ComponentDescriptor;
5
6use crate::{Chunk, RowId};
7
8// ---
9
10/// A query at a given time, for a given timeline.
11///
12/// Get the latest version of the data available at this time.
13#[derive(Clone, PartialEq, Eq, Hash)]
14pub struct LatestAtQuery {
15    timeline: TimelineName,
16    at: TimeInt,
17}
18
19impl std::fmt::Debug for LatestAtQuery {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        f.write_fmt(format_args!(
22            "<latest-at {:?} on {:?}>",
23            self.at, self.timeline,
24        ))
25    }
26}
27
28impl LatestAtQuery {
29    /// The returned query is guaranteed to never include [`TimeInt::STATIC`].
30    #[inline]
31    pub fn new(timeline: TimelineName, at: impl TryInto<TimeInt>) -> Self {
32        Self {
33            timeline,
34            at: TimeInt::saturated_temporal(at),
35        }
36    }
37
38    #[inline]
39    pub const fn latest(timeline: TimelineName) -> Self {
40        Self {
41            timeline,
42            at: TimeInt::MAX,
43        }
44    }
45
46    #[inline]
47    pub fn timeline(&self) -> TimelineName {
48        self.timeline
49    }
50
51    #[inline]
52    pub fn at(&self) -> TimeInt {
53        self.at
54    }
55}
56
57// ---
58
59impl Chunk {
60    /// Runs a [`LatestAtQuery`] filter on a [`Chunk`].
61    ///
62    /// This behaves as a row-based filter: the result is a new [`Chunk`] that is vertically
63    /// sliced to only contain the row relevant for the specified `query`.
64    ///
65    /// The resulting [`Chunk`] is guaranteed to contain all the same columns has the queried
66    /// chunk: there is no horizontal slicing going on.
67    ///
68    /// An empty [`Chunk`] (i.e. 0 rows, but N columns) is returned if the `query` yields nothing.
69    ///
70    /// Because the resulting chunk doesn't discard any column information, you can find extra relevant
71    /// information by inspecting the data, for examples timestamps on other timelines.
72    /// See [`Self::timeline_sliced`] and [`Self::component_sliced`] if you do want to filter this
73    /// extra data.
74    pub fn latest_at(&self, query: &LatestAtQuery, component_descr: &ComponentDescriptor) -> Self {
75        if self.is_empty() {
76            return self.clone();
77        }
78
79        re_tracing::profile_function!(format!("{query:?}"));
80
81        let Some(component_list_array) = self.components.get(component_descr) else {
82            return self.emptied();
83        };
84
85        let mut index = None;
86
87        let is_static = self.is_static();
88        let is_sorted_by_row_id = self.is_sorted();
89
90        if is_static {
91            if is_sorted_by_row_id {
92                // Static, row-sorted chunk
93
94                for i in (0..self.num_rows()).rev() {
95                    if !component_list_array.is_valid(i) {
96                        continue;
97                    }
98
99                    index = Some(i);
100                    break;
101                }
102            } else {
103                // Static, row-unsorted chunk
104
105                let mut closest_row_id = RowId::ZERO;
106
107                for (i, row_id) in self.row_ids().enumerate() {
108                    if !component_list_array.is_valid(i) {
109                        continue;
110                    }
111
112                    let is_closer_row_id = row_id > closest_row_id;
113
114                    if is_closer_row_id {
115                        closest_row_id = row_id;
116                        index = Some(i);
117                    }
118                }
119            }
120        } else {
121            let Some(time_column) = self.timelines.get(&query.timeline()) else {
122                return self.emptied();
123            };
124
125            let is_sorted_by_time = time_column.is_sorted();
126            let times = time_column.times_raw();
127
128            if is_sorted_by_time {
129                // Temporal, row-sorted, time-sorted chunk
130
131                let i = times
132                    .partition_point(|&time| time <= query.at().as_i64())
133                    .saturating_sub(1);
134
135                for i in (0..=i).rev() {
136                    if !component_list_array.is_valid(i) {
137                        continue;
138                    }
139
140                    index = Some(i);
141                    break;
142                }
143            } else {
144                // Temporal, unsorted chunk
145
146                let mut closest_data_time = TimeInt::MIN;
147                let mut closest_row_id = RowId::ZERO;
148
149                for (i, row_id) in self.row_ids().enumerate() {
150                    if !component_list_array.is_valid(i) {
151                        continue;
152                    }
153
154                    let data_time = TimeInt::new_temporal(times[i]);
155
156                    let is_closer_time = data_time > closest_data_time && data_time <= query.at();
157                    let is_same_time_but_closer_row_id =
158                        data_time == closest_data_time && row_id > closest_row_id;
159
160                    if is_closer_time || is_same_time_but_closer_row_id {
161                        closest_data_time = data_time;
162                        closest_row_id = row_id;
163                        index = Some(i);
164                    }
165                }
166            }
167        }
168
169        index.map_or_else(|| self.emptied(), |i| self.row_sliced(i, 1))
170    }
171}