1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::rc::Rc;
use std::sync::{Arc, Mutex};

use arrow::array::{BinaryArray, UInt64Array};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::{MemTable, RecordBatchIterator, TableProvider};
use datafusion::error::ExecutionError;

use crate::iterator::ValueCountItem;

/// Wrapper type to provide a simpler iteration over TableProvider.
pub struct FlatTable {
    table_provider: Rc<dyn TableProvider>,
}

impl FlatTable {
    pub fn new(table_provider: Rc<dyn TableProvider>) -> Self {
        FlatTable { table_provider }
    }

    pub fn iter(&self) -> Result<FlatTableIterator, String> {
        return FlatTableIterator::new(&self.table_provider);
    }

    pub fn table_provider(&self) -> Rc<dyn TableProvider> {
        Rc::clone(&self.table_provider)
    }
}

impl From<MemTable> for FlatTable {
    fn from(mt: MemTable) -> Self {
        FlatTable {
            table_provider: Rc::new(mt),
        }
    }
}

/// Actual iterator.
pub struct FlatTableIterator {
    partitions: Vec<Arc<Mutex<dyn RecordBatchIterator>>>,
    partitions_pos: usize,
    batch: Option<RecordBatch>,
    batch_row: usize,
}

impl<'a> FlatTableIterator {
    fn new(table_provider: &Rc<dyn TableProvider>) -> Result<FlatTableIterator, String> {
        let scan = table_provider
            .scan(&None, 100)
            .map_err(|e| format!("scanning the table {:?}", e))?;
        let iter = FlatTableIterator {
            partitions: scan,
            partitions_pos: 0,
            batch: None,
            batch_row: 0,
        };
        Ok(iter)
    }

    // Advance the iterator state to point to next valid position.
    fn advance_current(&mut self) -> Result<bool, ExecutionError> {
        // ensure self.batch is pointing to data
        if self.batch.is_none() {
            // reset state
            self.batch_row = 0;

            // Are we out of scan results?
            if self.partitions_pos == self.partitions.len() {
                return Ok(false);
            }

            // advance batch in current scan_result entry
            self.batch = {
                let mut batches = self.partitions[self.partitions_pos].lock().unwrap();
                batches.next()?
            };
            if self.batch.is_none() {
                // this scan_result entry is done, is there a next one?
                self.partitions_pos += 1;
                return self.advance_current();
            }
        }
        let batch = self.batch.as_ref().unwrap();
        if self.batch_row >= batch.num_rows() {
            self.batch = None;
            return self.advance_current();
        }

        return Ok(true);
    }

    fn current_value(&self) -> Option<String> {
        self.batch.as_ref().and_then(|batch| {
            let column = batch.column(0).as_any().downcast_ref::<BinaryArray>();

            // note: silently propagating None from the downcast and squasing String::from errors
            column.and_then(|c| String::from_utf8(c.value(self.batch_row).to_vec()).ok())
        })
    }

    fn current_count(&self) -> Option<u64> {
        self.batch.as_ref().and_then(|batch| {
            let column = batch.column(1).as_any().downcast_ref::<UInt64Array>();

            // note: silently propagating None from the downcast
            column.map(|c| c.value(self.batch_row))
        })
    }
}

impl Iterator for FlatTableIterator {
    type Item = ValueCountItem;

    fn next(&mut self) -> Option<Self::Item> {
        match self.advance_current() {
            Ok(false) => return None,
            Err(_) => return None,
            Ok(true) => {}
        }

        // extract the data from the current columns, they are guaranteed to be some
        let value = self.current_value();
        let count = self.current_count();
        self.batch_row += 1;

        value.map(|v| ValueCountItem(v, count.unwrap()))
    }
}