1use nautilus_core::{RowAccess, Value};
4use rustc_hash::FxHasher;
5use smallvec::SmallVec;
6use std::collections::HashMap;
7use std::hash::{BuildHasherDefault, Hasher};
8use std::sync::{Arc, OnceLock};
9
10const LINEAR_SCAN_LOOKUP_THRESHOLD: usize = 8;
11const INLINE_ROW_COLUMN_CAPACITY: usize = 8;
12
13type NameIndexMap = HashMap<u64, NameIndexEntry, BuildHasherDefault<U64IdentityHasher>>;
14type RowColumns = SmallVec<[(Arc<str>, Value); INLINE_ROW_COLUMN_CAPACITY]>;
15
16#[derive(Debug)]
17enum NameIndexEntry {
18 Single(usize),
19 Multiple(Vec<usize>),
20}
21
22#[derive(Debug)]
23struct RowNameIndex {
24 entries: NameIndexMap,
25}
26
27#[derive(Default)]
28struct U64IdentityHasher(u64);
29
30impl Hasher for U64IdentityHasher {
31 fn finish(&self) -> u64 {
32 self.0
33 }
34
35 fn write(&mut self, bytes: &[u8]) {
36 let mut hasher = FxHasher::default();
37 hasher.write(bytes);
38 self.0 = hasher.finish();
39 }
40
41 fn write_u64(&mut self, value: u64) {
42 self.0 = value;
43 }
44}
45
46impl RowNameIndex {
47 fn new(columns: &[(Arc<str>, Value)]) -> Self {
48 let mut entries =
49 NameIndexMap::with_capacity_and_hasher(columns.len(), BuildHasherDefault::default());
50
51 for (idx, (name, _)) in columns.iter().enumerate() {
52 let hash = hash_column_name(name);
53 match entries.entry(hash) {
54 std::collections::hash_map::Entry::Vacant(entry) => {
55 entry.insert(NameIndexEntry::Single(idx));
56 }
57 std::collections::hash_map::Entry::Occupied(mut entry) => match entry.get_mut() {
58 NameIndexEntry::Single(first_idx) => {
59 let existing = *first_idx;
60 entry.insert(NameIndexEntry::Multiple(vec![existing, idx]));
61 }
62 NameIndexEntry::Multiple(indices) => indices.push(idx),
63 },
64 }
65 }
66
67 Self { entries }
68 }
69
70 fn find(&self, columns: &[(Arc<str>, Value)], name: &str) -> Option<usize> {
71 self.find_hashed(columns, hash_column_name(name), name)
72 }
73
74 fn find_hashed(&self, columns: &[(Arc<str>, Value)], hash: u64, name: &str) -> Option<usize> {
75 let entry = self.entries.get(&hash)?;
76 match entry {
77 NameIndexEntry::Single(idx) => {
78 let (column_name, _) = columns.get(*idx)?;
79 (column_name.as_ref() == name).then_some(*idx)
80 }
81 NameIndexEntry::Multiple(indices) => indices.iter().copied().find(|idx| {
82 columns
83 .get(*idx)
84 .is_some_and(|(column_name, _)| column_name.as_ref() == name)
85 }),
86 }
87 }
88}
89
90fn hash_column_name(name: &str) -> u64 {
92 let mut hasher = FxHasher::default();
93 hasher.write(name.as_bytes());
94 hasher.finish()
95}
96
97#[derive(Debug)]
110pub struct Row {
111 columns: RowColumns,
112 index: OnceLock<RowNameIndex>,
113}
114
115impl Row {
116 pub fn new(columns: Vec<(String, Value)>) -> Self {
118 Self {
119 columns: columns
120 .into_iter()
121 .map(|(name, value)| (Arc::from(name), value))
122 .collect(),
123 index: OnceLock::new(),
124 }
125 }
126
127 pub fn with_capacity(capacity: usize) -> Self {
129 Self {
130 columns: SmallVec::with_capacity(capacity),
131 index: OnceLock::new(),
132 }
133 }
134
135 pub fn push_column(&mut self, name: impl Into<Arc<str>>, value: Value) {
142 self.columns.push((name.into(), value));
143 self.index = OnceLock::new();
144 }
145
146 pub fn get_by_pos(&self, idx: usize) -> Option<&Value> {
148 self.columns.get(idx).map(|(_, v)| v)
149 }
150
151 pub fn get(&self, name: &str) -> Option<&Value> {
156 if self.columns.len() <= LINEAR_SCAN_LOOKUP_THRESHOLD {
157 return self
158 .columns
159 .iter()
160 .find(|(column_name, _)| column_name.as_ref() == name)
161 .map(|(_, value)| value);
162 }
163
164 let index = self.index.get_or_init(|| RowNameIndex::new(&self.columns));
165 index
166 .find(&self.columns, name)
167 .and_then(|idx| self.get_by_pos(idx))
168 }
169
170 pub fn column_name(&self, idx: usize) -> Option<&str> {
172 self.columns.get(idx).map(|(name, _)| name.as_ref())
173 }
174
175 pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> {
177 self.columns.iter().map(|(name, val)| (name.as_ref(), val))
178 }
179
180 pub fn len(&self) -> usize {
182 self.columns.len()
183 }
184
185 pub fn is_empty(&self) -> bool {
187 self.columns.is_empty()
188 }
189
190 pub fn columns(&self) -> &[(Arc<str>, Value)] {
192 &self.columns
193 }
194
195 pub fn into_columns_iter(self) -> impl Iterator<Item = (Arc<str>, Value)> {
198 self.columns.into_iter()
199 }
200
201 pub fn into_columns(self) -> Vec<(Arc<str>, Value)> {
203 self.columns.into_vec()
204 }
205}
206
207impl<'row> RowAccess<'row> for Row {
212 fn get(&'row self, name: &str) -> Option<&'row Value> {
213 Row::get(self, name)
214 }
215
216 fn get_by_pos(&'row self, idx: usize) -> Option<&'row Value> {
217 Row::get_by_pos(self, idx)
218 }
219
220 fn column_name(&'row self, idx: usize) -> Option<&'row str> {
221 Row::column_name(self, idx)
222 }
223
224 fn len(&self) -> usize {
225 Row::len(self)
226 }
227
228 fn is_empty(&self) -> bool {
229 Row::is_empty(self)
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use nautilus_core::Value;
237
238 #[test]
239 fn test_row_positional_access() {
240 let row = Row::new(vec![
241 ("id".to_string(), Value::I64(1)),
242 ("name".to_string(), Value::String("Alice".to_string())),
243 ]);
244
245 assert_eq!(row.get_by_pos(0), Some(&Value::I64(1)));
246 assert_eq!(row.get_by_pos(1), Some(&Value::String("Alice".to_string())));
247 assert_eq!(row.get_by_pos(2), None);
248 }
249
250 #[test]
251 fn test_row_named_access() {
252 let row = Row::new(vec![
253 ("id".to_string(), Value::I64(1)),
254 ("name".to_string(), Value::String("Alice".to_string())),
255 ]);
256
257 assert_eq!(row.get("id"), Some(&Value::I64(1)));
258 assert_eq!(row.get("name"), Some(&Value::String("Alice".to_string())));
259 assert_eq!(row.get("age"), None);
260 }
261
262 #[test]
263 fn test_row_duplicate_columns() {
264 let row = Row::new(vec![
265 ("id".to_string(), Value::I64(1)),
266 ("id".to_string(), Value::I64(2)),
267 ("name".to_string(), Value::String("Alice".to_string())),
268 ]);
269
270 assert_eq!(row.get("id"), Some(&Value::I64(1)));
271 assert_eq!(row.get_by_pos(0), Some(&Value::I64(1)));
272 assert_eq!(row.get_by_pos(1), Some(&Value::I64(2)));
273 }
274
275 #[test]
276 fn test_row_iterator() {
277 let row = Row::new(vec![
278 ("id".to_string(), Value::I64(1)),
279 ("name".to_string(), Value::String("Alice".to_string())),
280 ]);
281
282 let items: Vec<_> = row.iter().collect();
283 assert_eq!(items.len(), 2);
284 assert_eq!(items[0], ("id", &Value::I64(1)));
285 assert_eq!(items[1], ("name", &Value::String("Alice".to_string())));
286 }
287
288 #[test]
289 fn test_row_empty() {
290 let row = Row::new(vec![]);
291 assert!(row.is_empty());
292 assert_eq!(row.len(), 0);
293 assert_eq!(row.get_by_pos(0), None);
294 assert_eq!(row.get("any"), None);
295 }
296
297 #[test]
298 fn test_row_column_name() {
299 let row = Row::new(vec![
300 ("id".to_string(), Value::I64(1)),
301 ("name".to_string(), Value::String("Alice".to_string())),
302 ]);
303
304 assert_eq!(row.column_name(0), Some("id"));
305 assert_eq!(row.column_name(1), Some("name"));
306 assert_eq!(row.column_name(2), None);
307 }
308
309 #[test]
310 fn test_row_columns_slice() {
311 let row = Row::new(vec![
312 ("x".to_string(), Value::I64(10)),
313 ("y".to_string(), Value::Bool(false)),
314 ]);
315
316 let cols = row.columns();
317 assert_eq!(cols.len(), 2);
318 assert_eq!(cols[0].0.as_ref(), "x");
319 assert_eq!(cols[0].1, Value::I64(10));
320 assert_eq!(cols[1].0.as_ref(), "y");
321 assert_eq!(cols[1].1, Value::Bool(false));
322 }
323
324 #[test]
325 fn test_row_wide_named_access_uses_index_without_cloning_names() {
326 let mut columns = Vec::new();
327 for idx in 0..=LINEAR_SCAN_LOOKUP_THRESHOLD {
328 columns.push((format!("col_{idx}"), Value::I64(idx as i64)));
329 }
330 let row = Row::new(columns);
331
332 assert_eq!(
333 row.get(&format!("col_{}", LINEAR_SCAN_LOOKUP_THRESHOLD)),
334 Some(&Value::I64(LINEAR_SCAN_LOOKUP_THRESHOLD as i64))
335 );
336 assert_eq!(row.get("missing"), None);
337 }
338
339 #[test]
340 fn test_row_name_index_disambiguates_colliding_candidates() {
341 let columns: Vec<(Arc<str>, Value)> = vec![
342 (Arc::from("first"), Value::I64(1)),
343 (Arc::from("second"), Value::I64(2)),
344 ];
345 let mut entries = NameIndexMap::with_capacity_and_hasher(1, BuildHasherDefault::default());
346 entries.insert(42, NameIndexEntry::Multiple(vec![0, 1]));
347 let index = RowNameIndex { entries };
348
349 assert_eq!(index.find_hashed(&columns, 42, "first"), Some(0));
350 assert_eq!(index.find_hashed(&columns, 42, "second"), Some(1));
351 assert_eq!(index.find_hashed(&columns, 42, "third"), None);
352 }
353}