manifoldb_query/exec/
row.rs

1//! Row types for query execution.
2//!
3//! This module defines the [`Row`] type used as the unit of data
4//! flowing through the execution operators.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use manifoldb_core::Value;
10
11/// A schema defines the column names and their order in a row.
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct Schema {
14    /// Column names in order (using `Arc<str>` to avoid cloning).
15    columns: Vec<Arc<str>>,
16    /// Map from column name to index for fast lookup.
17    name_to_index: HashMap<Arc<str>, usize>,
18}
19
20impl Schema {
21    /// Creates a new schema from column names.
22    #[must_use]
23    pub fn new(columns: Vec<String>) -> Self {
24        let arc_columns: Vec<Arc<str>> =
25            columns.into_iter().map(|s| Arc::from(s.as_str())).collect();
26        let name_to_index =
27            arc_columns.iter().enumerate().map(|(i, name)| (Arc::clone(name), i)).collect();
28        Self { columns: arc_columns, name_to_index }
29    }
30
31    /// Creates a new schema from `Arc<str>` column names (avoids allocation).
32    #[must_use]
33    pub fn from_arcs(columns: Vec<Arc<str>>) -> Self {
34        let name_to_index =
35            columns.iter().enumerate().map(|(i, name)| (Arc::clone(name), i)).collect();
36        Self { columns, name_to_index }
37    }
38
39    /// Creates an empty schema.
40    #[inline]
41    #[must_use]
42    pub fn empty() -> Self {
43        Self { columns: Vec::new(), name_to_index: HashMap::new() }
44    }
45
46    /// Returns the column names as string slices.
47    #[inline]
48    #[must_use]
49    pub fn columns(&self) -> Vec<&str> {
50        self.columns.iter().map(|s| s.as_ref()).collect()
51    }
52
53    /// Returns the `Arc<str>` column names (for efficient cloning).
54    #[must_use]
55    pub fn columns_arc(&self) -> &[Arc<str>] {
56        &self.columns
57    }
58
59    /// Returns the number of columns.
60    #[inline]
61    #[must_use]
62    pub fn len(&self) -> usize {
63        self.columns.len()
64    }
65
66    /// Returns true if the schema has no columns.
67    #[inline]
68    #[must_use]
69    pub fn is_empty(&self) -> bool {
70        self.columns.is_empty()
71    }
72
73    /// Gets the index for a column name.
74    #[inline]
75    #[must_use]
76    pub fn index_of(&self, name: &str) -> Option<usize> {
77        self.name_to_index.get(name).copied()
78    }
79
80    /// Gets the column name at an index.
81    #[inline]
82    #[must_use]
83    pub fn column_at(&self, index: usize) -> Option<&str> {
84        self.columns.get(index).map(|s| s.as_ref())
85    }
86
87    /// Creates a new schema with an additional column.
88    #[must_use]
89    pub fn with_column(&self, name: impl Into<String>) -> Self {
90        let mut columns: Vec<Arc<str>> = self.columns.iter().map(Arc::clone).collect();
91        columns.push(Arc::from(name.into().as_str()));
92        Self::from_arcs(columns)
93    }
94
95    /// Creates a new schema by merging with another (efficiently clones `Arc<str>`).
96    #[must_use]
97    pub fn merge(&self, other: &Schema) -> Self {
98        let mut columns: Vec<Arc<str>> = self.columns.iter().map(Arc::clone).collect();
99        columns.extend(other.columns.iter().map(Arc::clone));
100        Self::from_arcs(columns)
101    }
102
103    /// Creates a projection of this schema with only the given columns.
104    #[must_use]
105    pub fn project(&self, indices: &[usize]) -> Self {
106        let columns: Vec<Arc<str>> =
107            indices.iter().filter_map(|&i| self.columns.get(i).map(Arc::clone)).collect();
108        Self::from_arcs(columns)
109    }
110}
111
112impl Default for Schema {
113    fn default() -> Self {
114        Self::empty()
115    }
116}
117
118impl From<Vec<String>> for Schema {
119    fn from(columns: Vec<String>) -> Self {
120        Self::new(columns)
121    }
122}
123
124impl From<Vec<&str>> for Schema {
125    fn from(columns: Vec<&str>) -> Self {
126        Self::new(columns.into_iter().map(String::from).collect())
127    }
128}
129
130/// A row of values.
131///
132/// Rows are the unit of data flowing through execution operators.
133/// Each row contains values that correspond to the schema columns.
134#[derive(Debug, Clone, PartialEq)]
135pub struct Row {
136    /// The schema describing the columns.
137    schema: Arc<Schema>,
138    /// The values in this row.
139    values: Vec<Value>,
140}
141
142impl Row {
143    /// Creates a new row with the given schema and values.
144    ///
145    /// # Panics
146    ///
147    /// Panics if the number of values doesn't match the schema.
148    #[must_use]
149    pub fn new(schema: Arc<Schema>, values: Vec<Value>) -> Self {
150        debug_assert_eq!(
151            schema.len(),
152            values.len(),
153            "Row values count must match schema column count"
154        );
155        Self { schema, values }
156    }
157
158    /// Creates a row with a single value.
159    #[must_use]
160    pub fn single(name: impl Into<String>, value: Value) -> Self {
161        let schema = Arc::new(Schema::new(vec![name.into()]));
162        Self { schema, values: vec![value] }
163    }
164
165    /// Creates an empty row with the given schema.
166    #[must_use]
167    pub fn empty(schema: Arc<Schema>) -> Self {
168        let values = vec![Value::Null; schema.len()];
169        Self { schema, values }
170    }
171
172    /// Returns the schema of this row.
173    #[inline]
174    #[must_use]
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178
179    /// Returns the shared schema reference.
180    #[inline]
181    #[must_use]
182    pub fn schema_arc(&self) -> Arc<Schema> {
183        Arc::clone(&self.schema)
184    }
185
186    /// Returns the values in this row.
187    #[inline]
188    #[must_use]
189    pub fn values(&self) -> &[Value] {
190        &self.values
191    }
192
193    /// Returns the number of columns.
194    #[inline]
195    #[must_use]
196    pub fn len(&self) -> usize {
197        self.values.len()
198    }
199
200    /// Returns true if the row has no columns.
201    #[inline]
202    #[must_use]
203    pub fn is_empty(&self) -> bool {
204        self.values.is_empty()
205    }
206
207    /// Gets a value by column index.
208    #[inline]
209    #[must_use]
210    pub fn get(&self, index: usize) -> Option<&Value> {
211        self.values.get(index)
212    }
213
214    /// Gets a value by column name.
215    #[inline]
216    #[must_use]
217    pub fn get_by_name(&self, name: &str) -> Option<&Value> {
218        self.schema.index_of(name).and_then(|i| self.values.get(i))
219    }
220
221    /// Gets a mutable value by column index.
222    #[inline]
223    pub fn get_mut(&mut self, index: usize) -> Option<&mut Value> {
224        self.values.get_mut(index)
225    }
226
227    /// Sets a value by column index.
228    ///
229    /// Returns the old value if the index was valid.
230    #[inline]
231    pub fn set(&mut self, index: usize, value: Value) -> Option<Value> {
232        if index < self.values.len() {
233            Some(std::mem::replace(&mut self.values[index], value))
234        } else {
235            None
236        }
237    }
238
239    /// Creates a new row by projecting to specific column indices.
240    #[must_use]
241    pub fn project(&self, indices: &[usize]) -> Self {
242        let schema = Arc::new(self.schema.project(indices));
243        let values: Vec<Value> =
244            indices.iter().filter_map(|&i| self.values.get(i).cloned()).collect();
245        Self { schema, values }
246    }
247
248    /// Creates a new row by merging with another row.
249    #[must_use]
250    pub fn merge(&self, other: &Row) -> Self {
251        let schema = Arc::new(self.schema.merge(&other.schema));
252        let mut values = self.values.clone();
253        values.extend(other.values.iter().cloned());
254        Self { schema, values }
255    }
256
257    /// Consumes self and merges with another row's values (borrowed).
258    /// More efficient than `merge` when the left row can be consumed.
259    #[must_use]
260    pub fn merge_consume_left(mut self, other: &Row) -> Self {
261        let schema = Arc::new(self.schema.merge(&other.schema));
262        self.values.extend(other.values.iter().cloned());
263        Self { schema, values: self.values }
264    }
265
266    /// Consumes both rows and merges them.
267    /// Most efficient merge operation when both rows can be consumed.
268    #[must_use]
269    pub fn merge_consume_both(mut self, mut other: Row) -> Self {
270        let schema = Arc::new(self.schema.merge(&other.schema));
271        self.values.append(&mut other.values);
272        Self { schema, values: self.values }
273    }
274
275    /// Consumes the row and returns the values.
276    #[must_use]
277    pub fn into_values(self) -> Vec<Value> {
278        self.values
279    }
280
281    /// Converts the row to a map of column names to values.
282    #[must_use]
283    pub fn to_map(&self) -> HashMap<String, Value> {
284        self.schema
285            .columns_arc()
286            .iter()
287            .zip(self.values.iter())
288            .map(|(name, value)| (name.to_string(), value.clone()))
289            .collect()
290    }
291}
292
293impl IntoIterator for Row {
294    type Item = (Arc<str>, Value);
295    type IntoIter = std::iter::Zip<std::vec::IntoIter<Arc<str>>, std::vec::IntoIter<Value>>;
296
297    fn into_iter(self) -> Self::IntoIter {
298        self.schema.columns.iter().map(Arc::clone).collect::<Vec<_>>().into_iter().zip(self.values)
299    }
300}
301
302/// A batch of rows for efficient processing.
303///
304/// Row batches allow vectorized operations on multiple rows at once.
305#[derive(Debug, Clone)]
306pub struct RowBatch {
307    /// The schema shared by all rows.
308    schema: Arc<Schema>,
309    /// The rows in this batch.
310    rows: Vec<Row>,
311}
312
313impl RowBatch {
314    /// Creates a new row batch.
315    #[must_use]
316    pub fn new(schema: Arc<Schema>) -> Self {
317        Self { schema, rows: Vec::new() }
318    }
319
320    /// Creates a row batch with the given rows.
321    #[must_use]
322    pub fn with_rows(schema: Arc<Schema>, rows: Vec<Row>) -> Self {
323        Self { schema, rows }
324    }
325
326    /// Returns the schema.
327    #[inline]
328    #[must_use]
329    pub fn schema(&self) -> &Schema {
330        &self.schema
331    }
332
333    /// Returns the shared schema reference.
334    #[inline]
335    #[must_use]
336    pub fn schema_arc(&self) -> Arc<Schema> {
337        Arc::clone(&self.schema)
338    }
339
340    /// Returns the rows in this batch.
341    #[inline]
342    #[must_use]
343    pub fn rows(&self) -> &[Row] {
344        &self.rows
345    }
346
347    /// Returns the number of rows.
348    #[inline]
349    #[must_use]
350    pub fn len(&self) -> usize {
351        self.rows.len()
352    }
353
354    /// Returns true if the batch is empty.
355    #[inline]
356    #[must_use]
357    pub fn is_empty(&self) -> bool {
358        self.rows.is_empty()
359    }
360
361    /// Adds a row to the batch.
362    pub fn push(&mut self, row: Row) {
363        debug_assert_eq!(
364            row.schema().columns(),
365            self.schema.columns(),
366            "Row schema must match batch schema"
367        );
368        self.rows.push(row);
369    }
370
371    /// Removes and returns the last row.
372    pub fn pop(&mut self) -> Option<Row> {
373        self.rows.pop()
374    }
375
376    /// Clears the batch.
377    pub fn clear(&mut self) {
378        self.rows.clear();
379    }
380
381    /// Consumes the batch and returns the rows.
382    #[must_use]
383    pub fn into_rows(self) -> Vec<Row> {
384        self.rows
385    }
386}
387
388impl IntoIterator for RowBatch {
389    type Item = Row;
390    type IntoIter = std::vec::IntoIter<Row>;
391
392    fn into_iter(self) -> Self::IntoIter {
393        self.rows.into_iter()
394    }
395}
396
397impl<'a> IntoIterator for &'a RowBatch {
398    type Item = &'a Row;
399    type IntoIter = std::slice::Iter<'a, Row>;
400
401    fn into_iter(self) -> Self::IntoIter {
402        self.rows.iter()
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    #[test]
411    fn schema_basic() {
412        let schema = Schema::new(vec!["id".to_string(), "name".to_string()]);
413        assert_eq!(schema.len(), 2);
414        assert_eq!(schema.index_of("id"), Some(0));
415        assert_eq!(schema.index_of("name"), Some(1));
416        assert_eq!(schema.index_of("unknown"), None);
417    }
418
419    #[test]
420    fn schema_merge() {
421        let s1 = Schema::new(vec!["a".to_string()]);
422        let s2 = Schema::new(vec!["b".to_string()]);
423        let merged = s1.merge(&s2);
424        assert_eq!(merged.columns(), &["a", "b"]);
425    }
426
427    #[test]
428    fn row_basic() {
429        let schema = Arc::new(Schema::new(vec!["id".to_string(), "name".to_string()]));
430        let row = Row::new(Arc::clone(&schema), vec![Value::Int(1), Value::from("Alice")]);
431
432        assert_eq!(row.len(), 2);
433        assert_eq!(row.get(0), Some(&Value::Int(1)));
434        assert_eq!(row.get_by_name("name"), Some(&Value::from("Alice")));
435    }
436
437    #[test]
438    fn row_project() {
439        let schema = Arc::new(Schema::new(vec!["a".to_string(), "b".to_string(), "c".to_string()]));
440        let row = Row::new(Arc::clone(&schema), vec![Value::Int(1), Value::Int(2), Value::Int(3)]);
441
442        let projected = row.project(&[0, 2]);
443        assert_eq!(projected.len(), 2);
444        assert_eq!(projected.schema().columns(), &["a", "c"]);
445        assert_eq!(projected.get(0), Some(&Value::Int(1)));
446        assert_eq!(projected.get(1), Some(&Value::Int(3)));
447    }
448
449    #[test]
450    fn row_merge() {
451        let s1 = Arc::new(Schema::new(vec!["a".to_string()]));
452        let s2 = Arc::new(Schema::new(vec!["b".to_string()]));
453        let r1 = Row::new(s1, vec![Value::Int(1)]);
454        let r2 = Row::new(s2, vec![Value::Int(2)]);
455
456        let merged = r1.merge(&r2);
457        assert_eq!(merged.len(), 2);
458        assert_eq!(merged.get(0), Some(&Value::Int(1)));
459        assert_eq!(merged.get(1), Some(&Value::Int(2)));
460    }
461
462    #[test]
463    fn row_batch_basic() {
464        let schema = Arc::new(Schema::new(vec!["id".to_string()]));
465        let mut batch = RowBatch::new(Arc::clone(&schema));
466
467        batch.push(Row::new(Arc::clone(&schema), vec![Value::Int(1)]));
468        batch.push(Row::new(Arc::clone(&schema), vec![Value::Int(2)]));
469
470        assert_eq!(batch.len(), 2);
471        assert_eq!(batch.rows()[0].get(0), Some(&Value::Int(1)));
472    }
473}