iguazu 0.0.1

Tools for viewing, storing, and sharing mixed-signal time series data
Documentation
use std::{sync::Arc, time::Duration};

use jiff::{Timestamp, civil::DateTime, fmt::temporal::Pieces, tz::TimeZone};

use crate::{ElementSize, import::ImportError, schema::{Entity, EntitySchema, EntityStream, FieldKind, Ignored}, storage::MemoryStreamWriter, stream::Stream};

#[derive(Copy, Clone)]
pub(crate) enum TimeUnit {
    Second,
    Millisecond,
    Microsecond,
    Nanosecond,
}

impl TimeUnit {
    pub fn from_rate(rate: f64) -> Option<Self> {
        match rate {
            1.0 => Some(TimeUnit::Second),
            1_000.0 => Some(TimeUnit::Millisecond),
            1_000_000.0 => Some(TimeUnit::Microsecond),
            1_000_000_000.0 => Some(TimeUnit::Nanosecond),
            _ => None,
        }
    }

    pub fn scale(self, d: Duration) -> Option<u64> {
        match self {
            TimeUnit::Second => Some(d.as_secs()),
            TimeUnit::Millisecond => d.as_millis().try_into().ok(),
            TimeUnit::Microsecond => d.as_micros().try_into().ok(),
            TimeUnit::Nanosecond => d.as_nanos().try_into().ok(),
        }
    }
}

pub (crate) enum ColumnParser {
    Skip,
    Float32(MemoryStreamWriter),
    String { ends: MemoryStreamWriter, chars: MemoryStreamWriter },
    TimestampIso {
        epoch: Timestamp,
        unit: TimeUnit,
        default_zone: TimeZone,
        writer: MemoryStreamWriter
    },
    TimestampRelative {
        scale: f64,
        writer: MemoryStreamWriter
    },
}

impl ColumnParser {
    pub fn new(schema: &EntitySchema) -> Result<(EntityStream, ColumnParser), ImportError>{
        Ok(match schema {
            Entity::Data { data: Ignored, field, .. } => {
                let (data, parser) = match &field.kind {
                    FieldKind::Float32 => {
                        let writer = MemoryStreamWriter::new(ElementSize::U32);
                        let data = writer.stream().clone() as Arc<dyn Stream>;
                        (data, ColumnParser::Float32(writer))
                    }
                    FieldKind::Timestamp => {
                        let writer = MemoryStreamWriter::new(ElementSize::U64);
                        let data = writer.stream().clone() as Arc<dyn Stream>;

                        let Some(rate) = field.time_rate() else {
                          return Err(ImportError::SchemaMismatch("Timestamp rate must be defined".into()));
                        };

                        let parser = match field.time_epoch() {
                            Some(epoch) => {
                                let Some(unit) = TimeUnit::from_rate(rate) else {
                                    return Err(ImportError::SchemaMismatch("Timestamp unit must be s, ms, us, or ns".into()));
                                };
                                ColumnParser::TimestampIso { epoch, unit, writer, default_zone: TimeZone::UTC }
                            }
                            None => ColumnParser::TimestampRelative { scale: rate, writer }
                        };
                        (data, parser)
                    }
                    k => {
                        return Err(ImportError::SchemaMismatch(format!("Field type {:?} not supported in CSV column", k)));
                    }
                };

                let entity = Entity::Data { data, field: field.clone(), summaries: Default::default() };
                (entity, parser)
            }

            Entity::VariableArray { data: Ignored, child, attributes } => {
                match **child {
                    Entity::Data { data: Ignored, ref field, ..} => {
                        let ends = MemoryStreamWriter::new(ElementSize::U64);
                        let ends_stream = ends.stream().clone() as Arc<dyn Stream>;

                        let (data, parser) = match &field.kind {
                            FieldKind::Character => {
                                let chars = MemoryStreamWriter::new(ElementSize::U8);
                                let data = chars.stream().clone() as Arc<dyn Stream>;
                                (data, ColumnParser::String { ends, chars })
                            }
                            k => {
                                return Err(ImportError::SchemaMismatch(format!("Field type {:?} not supported in CSV column", k)));
                            }
                        };

                        let inner = Entity::Data { data, field: field.clone(), summaries: Default::default() };

                        let entity = Entity::VariableArray {
                            data: ends_stream,
                            child: Box::new(inner),
                            attributes: attributes.clone(),
                        };

                        (entity, parser)
                    }

                    ref k => {
                        return Err(ImportError::SchemaMismatch(format!("VariableArray child type {:?} not supported in CSV column", k)));
                    }
                }
            }

            ref k => {
                return Err(ImportError::SchemaMismatch(format!("Field type {:?} not supported in CSV column", k)));
            }
        })
    }

    pub fn parse(&mut self, value: &[u8]) -> Result<(), &'static str> {
        match self {
            ColumnParser::Skip => {
                Ok(())
            }
            ColumnParser::Float32(writer) => {
                let f = lexical_core::parse::<f32>(value)
                    .map_err(|_| "float")?;
                writer.extend_from_slice(&f.to_ne_bytes());
                Ok(())
            }
            ColumnParser::String { ends, chars } => {
                chars.extend_from_slice(value);
                ends.extend_from_slice(&chars.pos().to_ne_bytes());
                Ok(())
            }
            ColumnParser::TimestampIso { epoch, unit, default_zone, writer } => {
                let pieces = Pieces::parse(value).map_err(|_| "timestamp")?;
                let dt = DateTime::from_parts(pieces.date(), pieces.time().unwrap_or_default());
                let t = match pieces.offset() {
                    Some(offset) => offset.to_numeric_offset().to_timestamp(dt),
                    None => default_zone.to_timestamp(dt),
                }.map_err(|_| "timestamp (invalid)")?;
                let d = Duration::try_from(t.duration_since(*epoch)).map_err(|_| "timestamp (prior to epoch)")?;
                let val: u64 = unit.scale(d).ok_or("timestamp (out of range)")?;
                writer.extend_from_slice(&val.to_ne_bytes());
                Ok(())
            }
            ColumnParser::TimestampRelative { writer, scale } => {
                let f = lexical_core::parse::<f64>(value)
                    .map_err(|_| "decimal timestamp")?;

                let fv = (*scale * f).round();
                if fv < 0.0 || fv > (u64::MAX as f64) {
                    return Err("decimal timestamp (out of range)");
                }
                let v = fv as u64;

                writer.extend_from_slice(&v.to_ne_bytes());
                Ok(())
            }
        }
    }

    pub fn commit(&mut self) {
        match self {
            ColumnParser::Float32(writer) => writer.commit(),
            ColumnParser::String { ends, chars } => {
                ends.commit();
                chars.commit();
            }
            ColumnParser::TimestampIso { writer, .. } => writer.commit(),
            ColumnParser::TimestampRelative { writer, .. } => writer.commit(),
            ColumnParser::Skip => {}
        }
    }
}