use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use bytes::Bytes;
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::proto::{ColumnEncoding, CompressionKind, StripeFooter, StripeInformation};
use crate::reader::decompress::Decompressor;
use crate::reader::schema::TypeDescription;
use crate::reader::Reader;
pub mod binary;
pub mod boolean;
pub mod date;
pub mod float;
pub mod int;
pub mod present;
pub mod string;
pub mod timestamp;
#[derive(Debug)]
pub struct Column {
    data: Bytes,
    number_of_rows: u64,
    compression: CompressionKind,
    footer: Arc<StripeFooter>,
    name: String,
    column: Arc<TypeDescription>,
}
macro_rules! impl_read_stream {
    ($reader:ident,$start:ident,$length:ident $($_await:tt)*) => {{
        $reader
            .inner
            .seek(SeekFrom::Start($start))$($_await)*
            .context(error::IoSnafu)?;
        let mut scratch = vec![0; $length];
        $reader
            .inner
            .read_exact(&mut scratch)$($_await)*
            .context(error::IoSnafu)?;
        Ok(Bytes::from(scratch))
    }};
}
impl Column {
    pub fn read_stream<R: Read + Seek>(
        reader: &mut Reader<R>,
        start: u64,
        length: usize,
    ) -> Result<Bytes> {
        impl_read_stream!(reader, start, length)
    }
    pub async fn read_stream_async<R: AsyncRead + AsyncSeek + Unpin + Send>(
        reader: &mut Reader<R>,
        start: u64,
        length: usize,
    ) -> Result<Bytes> {
        impl_read_stream!(reader, start, length.await)
    }
    pub fn get_stream_info(
        name: &str,
        column: &Arc<TypeDescription>,
        footer: &Arc<StripeFooter>,
        stripe: &StripeInformation,
    ) -> Result<(u64, usize)> {
        let mut start = 0; let column_idx = column.column_id() as u32;
        let start = footer
            .streams
            .iter()
            .map(|stream| {
                start += stream.length();
                (start, stream)
            })
            .find(|(_, stream)| stream.column() == column_idx && stream.kind() != Kind::RowIndex)
            .map(|(start, stream)| start - stream.length())
            .with_context(|| error::InvalidColumnSnafu { name })?;
        let length = footer
            .streams
            .iter()
            .filter(|stream| stream.column() == column_idx && stream.kind() != Kind::RowIndex)
            .fold(0, |acc, stream| acc + stream.length()) as usize;
        let start = stripe.offset() + start;
        Ok((start, length))
    }
    pub fn new<R: Read + Seek>(
        reader: &mut Reader<R>,
        compression: CompressionKind,
        name: &str,
        column: &Arc<TypeDescription>,
        footer: &Arc<StripeFooter>,
        stripe: &StripeInformation,
    ) -> Result<Self> {
        let (start, length) = Column::get_stream_info(name, column, footer, stripe)?;
        let data = Column::read_stream(reader, start, length)?;
        Ok(Self {
            data,
            number_of_rows: stripe.number_of_rows(),
            compression,
            footer: footer.clone(),
            column: column.clone(),
            name: name.to_string(),
        })
    }
    pub async fn new_async<R: AsyncRead + AsyncSeek + Unpin + Send>(
        reader: &mut Reader<R>,
        compression: CompressionKind,
        name: &str,
        column: &Arc<TypeDescription>,
        footer: &Arc<StripeFooter>,
        stripe: &StripeInformation,
    ) -> Result<Self> {
        let (start, length) = Column::get_stream_info(name, column, footer, stripe)?;
        let data = Column::read_stream_async(reader, start, length).await?;
        Ok(Self {
            data,
            number_of_rows: stripe.number_of_rows(),
            compression,
            footer: footer.clone(),
            column: column.clone(),
            name: name.to_string(),
        })
    }
    pub fn stream(&self, kind: Kind) -> Option<Result<Decompressor>> {
        let mut start = 0; let column_id = self.column.column_id() as u32;
        self.footer
            .streams
            .iter()
            .filter(|stream| stream.column() == column_id && stream.kind() != Kind::RowIndex)
            .map(|stream| {
                start += stream.length() as usize;
                stream
            })
            .find(|stream| stream.kind() == kind)
            .map(|stream| {
                let length = stream.length() as usize;
                let data = self.data.slice((start - length)..start);
                Decompressor::new(data, self.compression, vec![])
            })
            .map(Ok)
    }
    pub fn dictionary_size(&self) -> usize {
        let column = self.column.column_id();
        self.footer.columns[column]
            .dictionary_size
            .unwrap_or_default() as usize
    }
    pub fn encoding(&self) -> ColumnEncoding {
        let column = self.column.column_id();
        self.footer.columns[column]
    }
    pub fn number_of_rows(&self) -> usize {
        self.number_of_rows as usize
    }
    pub fn kind(&self) -> crate::proto::r#type::Kind {
        self.column.kind()
    }
}
pub struct NullableIterator<T> {
    present: Box<dyn Iterator<Item = bool> + Send>,
    iter: Box<dyn Iterator<Item = Result<T>> + Send>,
}
impl<T> Iterator for NullableIterator<T> {
    type Item = Result<Option<T>>;
    fn next(&mut self) -> Option<Self::Item> {
        match self.present.next() {
            Some(present) => {
                if present {
                    match self.iter.next() {
                        Some(Ok(value)) => Some(Ok(Some(value))),
                        Some(Err(err)) => Some(Err(err)),
                        None => None,
                    }
                } else {
                    Some(Ok(None))
                }
            }
            None => None,
        }
    }
}