use nautilus_core::{RowAccess, Value};
use rustc_hash::FxHasher;
use smallvec::SmallVec;
use std::collections::HashMap;
use std::hash::{BuildHasherDefault, Hasher};
use std::sync::{Arc, OnceLock};
const LINEAR_SCAN_LOOKUP_THRESHOLD: usize = 8;
const INLINE_ROW_COLUMN_CAPACITY: usize = 8;
type NameIndexMap = HashMap<u64, NameIndexEntry, BuildHasherDefault<U64IdentityHasher>>;
type RowColumns = SmallVec<[(Arc<str>, Value); INLINE_ROW_COLUMN_CAPACITY]>;
#[derive(Debug)]
enum NameIndexEntry {
Single(usize),
Multiple(Vec<usize>),
}
#[derive(Debug)]
struct RowNameIndex {
entries: NameIndexMap,
}
#[derive(Default)]
struct U64IdentityHasher(u64);
impl Hasher for U64IdentityHasher {
fn finish(&self) -> u64 {
self.0
}
fn write(&mut self, bytes: &[u8]) {
let mut hasher = FxHasher::default();
hasher.write(bytes);
self.0 = hasher.finish();
}
fn write_u64(&mut self, value: u64) {
self.0 = value;
}
}
impl RowNameIndex {
fn new(columns: &[(Arc<str>, Value)]) -> Self {
let mut entries =
NameIndexMap::with_capacity_and_hasher(columns.len(), BuildHasherDefault::default());
for (idx, (name, _)) in columns.iter().enumerate() {
let hash = hash_column_name(name);
match entries.entry(hash) {
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(NameIndexEntry::Single(idx));
}
std::collections::hash_map::Entry::Occupied(mut entry) => match entry.get_mut() {
NameIndexEntry::Single(first_idx) => {
let existing = *first_idx;
entry.insert(NameIndexEntry::Multiple(vec![existing, idx]));
}
NameIndexEntry::Multiple(indices) => indices.push(idx),
},
}
}
Self { entries }
}
fn find(&self, columns: &[(Arc<str>, Value)], name: &str) -> Option<usize> {
self.find_hashed(columns, hash_column_name(name), name)
}
fn find_hashed(&self, columns: &[(Arc<str>, Value)], hash: u64, name: &str) -> Option<usize> {
let entry = self.entries.get(&hash)?;
match entry {
NameIndexEntry::Single(idx) => {
let (column_name, _) = columns.get(*idx)?;
(column_name.as_ref() == name).then_some(*idx)
}
NameIndexEntry::Multiple(indices) => indices.iter().copied().find(|idx| {
columns
.get(*idx)
.is_some_and(|(column_name, _)| column_name.as_ref() == name)
}),
}
}
}
fn hash_column_name(name: &str) -> u64 {
let mut hasher = FxHasher::default();
hasher.write(name.as_bytes());
hasher.finish()
}
#[derive(Debug)]
pub struct Row {
columns: RowColumns,
index: OnceLock<RowNameIndex>,
}
impl Row {
pub fn new(columns: Vec<(String, Value)>) -> Self {
Self {
columns: columns
.into_iter()
.map(|(name, value)| (Arc::from(name), value))
.collect(),
index: OnceLock::new(),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
columns: SmallVec::with_capacity(capacity),
index: OnceLock::new(),
}
}
pub fn push_column(&mut self, name: impl Into<Arc<str>>, value: Value) {
self.columns.push((name.into(), value));
self.index = OnceLock::new();
}
pub fn get_by_pos(&self, idx: usize) -> Option<&Value> {
self.columns.get(idx).map(|(_, v)| v)
}
pub fn get(&self, name: &str) -> Option<&Value> {
if self.columns.len() <= LINEAR_SCAN_LOOKUP_THRESHOLD {
return self
.columns
.iter()
.find(|(column_name, _)| column_name.as_ref() == name)
.map(|(_, value)| value);
}
let index = self.index.get_or_init(|| RowNameIndex::new(&self.columns));
index
.find(&self.columns, name)
.and_then(|idx| self.get_by_pos(idx))
}
pub fn column_name(&self, idx: usize) -> Option<&str> {
self.columns.get(idx).map(|(name, _)| name.as_ref())
}
pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> {
self.columns.iter().map(|(name, val)| (name.as_ref(), val))
}
pub fn len(&self) -> usize {
self.columns.len()
}
pub fn is_empty(&self) -> bool {
self.columns.is_empty()
}
pub fn columns(&self) -> &[(Arc<str>, Value)] {
&self.columns
}
pub fn into_columns_iter(self) -> impl Iterator<Item = (Arc<str>, Value)> {
self.columns.into_iter()
}
pub fn into_columns(self) -> Vec<(Arc<str>, Value)> {
self.columns.into_vec()
}
}
impl<'row> RowAccess<'row> for Row {
fn get(&'row self, name: &str) -> Option<&'row Value> {
Row::get(self, name)
}
fn get_by_pos(&'row self, idx: usize) -> Option<&'row Value> {
Row::get_by_pos(self, idx)
}
fn column_name(&'row self, idx: usize) -> Option<&'row str> {
Row::column_name(self, idx)
}
fn len(&self) -> usize {
Row::len(self)
}
fn is_empty(&self) -> bool {
Row::is_empty(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use nautilus_core::Value;
#[test]
fn test_row_positional_access() {
let row = Row::new(vec![
("id".to_string(), Value::I64(1)),
("name".to_string(), Value::String("Alice".to_string())),
]);
assert_eq!(row.get_by_pos(0), Some(&Value::I64(1)));
assert_eq!(row.get_by_pos(1), Some(&Value::String("Alice".to_string())));
assert_eq!(row.get_by_pos(2), None);
}
#[test]
fn test_row_named_access() {
let row = Row::new(vec![
("id".to_string(), Value::I64(1)),
("name".to_string(), Value::String("Alice".to_string())),
]);
assert_eq!(row.get("id"), Some(&Value::I64(1)));
assert_eq!(row.get("name"), Some(&Value::String("Alice".to_string())));
assert_eq!(row.get("age"), None);
}
#[test]
fn test_row_duplicate_columns() {
let row = Row::new(vec![
("id".to_string(), Value::I64(1)),
("id".to_string(), Value::I64(2)),
("name".to_string(), Value::String("Alice".to_string())),
]);
assert_eq!(row.get("id"), Some(&Value::I64(1)));
assert_eq!(row.get_by_pos(0), Some(&Value::I64(1)));
assert_eq!(row.get_by_pos(1), Some(&Value::I64(2)));
}
#[test]
fn test_row_iterator() {
let row = Row::new(vec![
("id".to_string(), Value::I64(1)),
("name".to_string(), Value::String("Alice".to_string())),
]);
let items: Vec<_> = row.iter().collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0], ("id", &Value::I64(1)));
assert_eq!(items[1], ("name", &Value::String("Alice".to_string())));
}
#[test]
fn test_row_empty() {
let row = Row::new(vec![]);
assert!(row.is_empty());
assert_eq!(row.len(), 0);
assert_eq!(row.get_by_pos(0), None);
assert_eq!(row.get("any"), None);
}
#[test]
fn test_row_column_name() {
let row = Row::new(vec![
("id".to_string(), Value::I64(1)),
("name".to_string(), Value::String("Alice".to_string())),
]);
assert_eq!(row.column_name(0), Some("id"));
assert_eq!(row.column_name(1), Some("name"));
assert_eq!(row.column_name(2), None);
}
#[test]
fn test_row_columns_slice() {
let row = Row::new(vec![
("x".to_string(), Value::I64(10)),
("y".to_string(), Value::Bool(false)),
]);
let cols = row.columns();
assert_eq!(cols.len(), 2);
assert_eq!(cols[0].0.as_ref(), "x");
assert_eq!(cols[0].1, Value::I64(10));
assert_eq!(cols[1].0.as_ref(), "y");
assert_eq!(cols[1].1, Value::Bool(false));
}
#[test]
fn test_row_wide_named_access_uses_index_without_cloning_names() {
let mut columns = Vec::new();
for idx in 0..=LINEAR_SCAN_LOOKUP_THRESHOLD {
columns.push((format!("col_{idx}"), Value::I64(idx as i64)));
}
let row = Row::new(columns);
assert_eq!(
row.get(&format!("col_{}", LINEAR_SCAN_LOOKUP_THRESHOLD)),
Some(&Value::I64(LINEAR_SCAN_LOOKUP_THRESHOLD as i64))
);
assert_eq!(row.get("missing"), None);
}
#[test]
fn test_row_name_index_disambiguates_colliding_candidates() {
let columns: Vec<(Arc<str>, Value)> = vec![
(Arc::from("first"), Value::I64(1)),
(Arc::from("second"), Value::I64(2)),
];
let mut entries = NameIndexMap::with_capacity_and_hasher(1, BuildHasherDefault::default());
entries.insert(42, NameIndexEntry::Multiple(vec![0, 1]));
let index = RowNameIndex { entries };
assert_eq!(index.find_hashed(&columns, 42, "first"), Some(0));
assert_eq!(index.find_hashed(&columns, 42, "second"), Some(1));
assert_eq!(index.find_hashed(&columns, 42, "third"), None);
}
}