Skip to main content

nautilus_connector/
row.rs

1//! Row representation for database results.
2
3use nautilus_core::{RowAccess, Value};
4use rustc_hash::FxHasher;
5use smallvec::SmallVec;
6use std::collections::HashMap;
7use std::hash::{BuildHasherDefault, Hasher};
8use std::sync::{Arc, OnceLock};
9
10const LINEAR_SCAN_LOOKUP_THRESHOLD: usize = 8;
11const INLINE_ROW_COLUMN_CAPACITY: usize = 8;
12
13type NameIndexMap = HashMap<u64, NameIndexEntry, BuildHasherDefault<U64IdentityHasher>>;
14type RowColumns = SmallVec<[(Arc<str>, Value); INLINE_ROW_COLUMN_CAPACITY]>;
15
16#[derive(Debug)]
17enum NameIndexEntry {
18    Single(usize),
19    Multiple(Vec<usize>),
20}
21
22#[derive(Debug)]
23struct RowNameIndex {
24    entries: NameIndexMap,
25}
26
27#[derive(Default)]
28struct U64IdentityHasher(u64);
29
30impl Hasher for U64IdentityHasher {
31    fn finish(&self) -> u64 {
32        self.0
33    }
34
35    fn write(&mut self, bytes: &[u8]) {
36        let mut hasher = FxHasher::default();
37        hasher.write(bytes);
38        self.0 = hasher.finish();
39    }
40
41    fn write_u64(&mut self, value: u64) {
42        self.0 = value;
43    }
44}
45
46impl RowNameIndex {
47    fn new(columns: &[(Arc<str>, Value)]) -> Self {
48        let mut entries =
49            NameIndexMap::with_capacity_and_hasher(columns.len(), BuildHasherDefault::default());
50
51        for (idx, (name, _)) in columns.iter().enumerate() {
52            let hash = hash_column_name(name);
53            match entries.entry(hash) {
54                std::collections::hash_map::Entry::Vacant(entry) => {
55                    entry.insert(NameIndexEntry::Single(idx));
56                }
57                std::collections::hash_map::Entry::Occupied(mut entry) => match entry.get_mut() {
58                    NameIndexEntry::Single(first_idx) => {
59                        let existing = *first_idx;
60                        entry.insert(NameIndexEntry::Multiple(vec![existing, idx]));
61                    }
62                    NameIndexEntry::Multiple(indices) => indices.push(idx),
63                },
64            }
65        }
66
67        Self { entries }
68    }
69
70    fn find(&self, columns: &[(Arc<str>, Value)], name: &str) -> Option<usize> {
71        self.find_hashed(columns, hash_column_name(name), name)
72    }
73
74    fn find_hashed(&self, columns: &[(Arc<str>, Value)], hash: u64, name: &str) -> Option<usize> {
75        let entry = self.entries.get(&hash)?;
76        match entry {
77            NameIndexEntry::Single(idx) => {
78                let (column_name, _) = columns.get(*idx)?;
79                (column_name.as_ref() == name).then_some(*idx)
80            }
81            NameIndexEntry::Multiple(indices) => indices.iter().copied().find(|idx| {
82                columns
83                    .get(*idx)
84                    .is_some_and(|(column_name, _)| column_name.as_ref() == name)
85            }),
86        }
87    }
88}
89
90/// Hash a column name with `rustc-hash`'s lightweight `FxHasher`.
91fn hash_column_name(name: &str) -> u64 {
92    let mut hasher = FxHasher::default();
93    hasher.write(name.as_bytes());
94    hasher.finish()
95}
96
97/// A database row with hybrid access patterns.
98///
99/// Stores columns as ordered `(Arc<str>, Value)` pairs, preserving duplicates.
100/// Column names are `Arc<str>` so that decoders can build them once per
101/// statement and share them across every row of a result set (an `Arc` clone
102/// instead of a fresh heap `String` per cell). Small rows use a linear scan
103/// to avoid index-allocation overhead; wider rows lazily build a compact
104/// name-to-index map on first `get()` call.
105///
106/// ## Duplicate Column Policy
107///
108/// If multiple columns have the same name, `get(name)` returns the first occurrence.
109#[derive(Debug)]
110pub struct Row {
111    columns: RowColumns,
112    index: OnceLock<RowNameIndex>,
113}
114
115impl Row {
116    /// Create a new row from column-value pairs.
117    pub fn new(columns: Vec<(String, Value)>) -> Self {
118        Self {
119            columns: columns
120                .into_iter()
121                .map(|(name, value)| (Arc::from(name), value))
122                .collect(),
123            index: OnceLock::new(),
124        }
125    }
126
127    /// Create an empty row with enough capacity for the expected column count.
128    pub fn with_capacity(capacity: usize) -> Self {
129        Self {
130            columns: SmallVec::with_capacity(capacity),
131            index: OnceLock::new(),
132        }
133    }
134
135    /// Append a column while constructing or reshaping a row.
136    ///
137    /// Accepts anything convertible into `Arc<str>`: decoders pass shared
138    /// `Arc<str>` clones (no allocation), reshaping code can keep passing
139    /// `String`s. This invalidates the lazy name index so subsequent lookups
140    /// stay correct.
141    pub fn push_column(&mut self, name: impl Into<Arc<str>>, value: Value) {
142        self.columns.push((name.into(), value));
143        self.index = OnceLock::new();
144    }
145
146    /// Get a value by column position (0-indexed).
147    pub fn get_by_pos(&self, idx: usize) -> Option<&Value> {
148        self.columns.get(idx).map(|(_, v)| v)
149    }
150
151    /// Get a value by column name.
152    ///
153    /// Narrow rows use a direct scan. Wider rows lazily build an index on the
154    /// first lookup. If duplicate columns exist, returns the first occurrence.
155    pub fn get(&self, name: &str) -> Option<&Value> {
156        if self.columns.len() <= LINEAR_SCAN_LOOKUP_THRESHOLD {
157            return self
158                .columns
159                .iter()
160                .find(|(column_name, _)| column_name.as_ref() == name)
161                .map(|(_, value)| value);
162        }
163
164        let index = self.index.get_or_init(|| RowNameIndex::new(&self.columns));
165        index
166            .find(&self.columns, name)
167            .and_then(|idx| self.get_by_pos(idx))
168    }
169
170    /// Get the column name at the given position.
171    pub fn column_name(&self, idx: usize) -> Option<&str> {
172        self.columns.get(idx).map(|(name, _)| name.as_ref())
173    }
174
175    /// Iterate over all columns as `(name, value)` pairs.
176    pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> {
177        self.columns.iter().map(|(name, val)| (name.as_ref(), val))
178    }
179
180    /// Return the number of columns.
181    pub fn len(&self) -> usize {
182        self.columns.len()
183    }
184
185    /// Check if the row has no columns.
186    pub fn is_empty(&self) -> bool {
187        self.columns.is_empty()
188    }
189
190    /// Get all columns as a slice.
191    pub fn columns(&self) -> &[(Arc<str>, Value)] {
192        &self.columns
193    }
194
195    /// Consume the row and iterate over owned `(name, value)` pairs without
196    /// forcing the internal storage back into a `Vec`.
197    pub fn into_columns_iter(self) -> impl Iterator<Item = (Arc<str>, Value)> {
198        self.columns.into_iter()
199    }
200
201    /// Consume the row and return the owned columns.
202    pub fn into_columns(self) -> Vec<(Arc<str>, Value)> {
203        self.columns.into_vec()
204    }
205}
206
207/// Implement RowAccess trait for the owned Row type.
208///
209/// This allows Row to be used with the GAT-based executor interface
210/// while maintaining backward compatibility with existing code.
211impl<'row> RowAccess<'row> for Row {
212    fn get(&'row self, name: &str) -> Option<&'row Value> {
213        Row::get(self, name)
214    }
215
216    fn get_by_pos(&'row self, idx: usize) -> Option<&'row Value> {
217        Row::get_by_pos(self, idx)
218    }
219
220    fn column_name(&'row self, idx: usize) -> Option<&'row str> {
221        Row::column_name(self, idx)
222    }
223
224    fn len(&self) -> usize {
225        Row::len(self)
226    }
227
228    fn is_empty(&self) -> bool {
229        Row::is_empty(self)
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use nautilus_core::Value;
237
238    #[test]
239    fn test_row_positional_access() {
240        let row = Row::new(vec![
241            ("id".to_string(), Value::I64(1)),
242            ("name".to_string(), Value::String("Alice".to_string())),
243        ]);
244
245        assert_eq!(row.get_by_pos(0), Some(&Value::I64(1)));
246        assert_eq!(row.get_by_pos(1), Some(&Value::String("Alice".to_string())));
247        assert_eq!(row.get_by_pos(2), None);
248    }
249
250    #[test]
251    fn test_row_named_access() {
252        let row = Row::new(vec![
253            ("id".to_string(), Value::I64(1)),
254            ("name".to_string(), Value::String("Alice".to_string())),
255        ]);
256
257        assert_eq!(row.get("id"), Some(&Value::I64(1)));
258        assert_eq!(row.get("name"), Some(&Value::String("Alice".to_string())));
259        assert_eq!(row.get("age"), None);
260    }
261
262    #[test]
263    fn test_row_duplicate_columns() {
264        let row = Row::new(vec![
265            ("id".to_string(), Value::I64(1)),
266            ("id".to_string(), Value::I64(2)),
267            ("name".to_string(), Value::String("Alice".to_string())),
268        ]);
269
270        assert_eq!(row.get("id"), Some(&Value::I64(1)));
271        assert_eq!(row.get_by_pos(0), Some(&Value::I64(1)));
272        assert_eq!(row.get_by_pos(1), Some(&Value::I64(2)));
273    }
274
275    #[test]
276    fn test_row_iterator() {
277        let row = Row::new(vec![
278            ("id".to_string(), Value::I64(1)),
279            ("name".to_string(), Value::String("Alice".to_string())),
280        ]);
281
282        let items: Vec<_> = row.iter().collect();
283        assert_eq!(items.len(), 2);
284        assert_eq!(items[0], ("id", &Value::I64(1)));
285        assert_eq!(items[1], ("name", &Value::String("Alice".to_string())));
286    }
287
288    #[test]
289    fn test_row_empty() {
290        let row = Row::new(vec![]);
291        assert!(row.is_empty());
292        assert_eq!(row.len(), 0);
293        assert_eq!(row.get_by_pos(0), None);
294        assert_eq!(row.get("any"), None);
295    }
296
297    #[test]
298    fn test_row_column_name() {
299        let row = Row::new(vec![
300            ("id".to_string(), Value::I64(1)),
301            ("name".to_string(), Value::String("Alice".to_string())),
302        ]);
303
304        assert_eq!(row.column_name(0), Some("id"));
305        assert_eq!(row.column_name(1), Some("name"));
306        assert_eq!(row.column_name(2), None);
307    }
308
309    #[test]
310    fn test_row_columns_slice() {
311        let row = Row::new(vec![
312            ("x".to_string(), Value::I64(10)),
313            ("y".to_string(), Value::Bool(false)),
314        ]);
315
316        let cols = row.columns();
317        assert_eq!(cols.len(), 2);
318        assert_eq!(cols[0].0.as_ref(), "x");
319        assert_eq!(cols[0].1, Value::I64(10));
320        assert_eq!(cols[1].0.as_ref(), "y");
321        assert_eq!(cols[1].1, Value::Bool(false));
322    }
323
324    #[test]
325    fn test_row_wide_named_access_uses_index_without_cloning_names() {
326        let mut columns = Vec::new();
327        for idx in 0..=LINEAR_SCAN_LOOKUP_THRESHOLD {
328            columns.push((format!("col_{idx}"), Value::I64(idx as i64)));
329        }
330        let row = Row::new(columns);
331
332        assert_eq!(
333            row.get(&format!("col_{}", LINEAR_SCAN_LOOKUP_THRESHOLD)),
334            Some(&Value::I64(LINEAR_SCAN_LOOKUP_THRESHOLD as i64))
335        );
336        assert_eq!(row.get("missing"), None);
337    }
338
339    #[test]
340    fn test_row_name_index_disambiguates_colliding_candidates() {
341        let columns: Vec<(Arc<str>, Value)> = vec![
342            (Arc::from("first"), Value::I64(1)),
343            (Arc::from("second"), Value::I64(2)),
344        ];
345        let mut entries = NameIndexMap::with_capacity_and_hasher(1, BuildHasherDefault::default());
346        entries.insert(42, NameIndexEntry::Multiple(vec![0, 1]));
347        let index = RowNameIndex { entries };
348
349        assert_eq!(index.find_hashed(&columns, 42, "first"), Some(0));
350        assert_eq!(index.find_hashed(&columns, 42, "second"), Some(1));
351        assert_eq!(index.find_hashed(&columns, 42, "third"), None);
352    }
353}