cross-stream 0.12.0

An event stream store for personal, local-first use, specializing in event sourcing.
Documentation
use std::io::Read;
use std::io::Write;

use chrono::{DateTime, Utc};
use nu_protocol::shell_error::generic::GenericError;
use nu_protocol::{PipelineData, Record, ShellError, Span, Value};

use crate::store::Frame;
use crate::store::Store;

pub fn json_to_value(json: &serde_json::Value, span: Span) -> Value {
    match json {
        serde_json::Value::Null => Value::nothing(span),
        serde_json::Value::Bool(b) => Value::bool(*b, span),
        serde_json::Value::Number(n) => {
            if let Some(i) = n.as_i64() {
                Value::int(i, span)
            } else if let Some(f) = n.as_f64() {
                Value::float(f, span)
            } else {
                Value::string(n.to_string(), span)
            }
        }
        serde_json::Value::String(s) => Value::string(s, span),
        serde_json::Value::Array(arr) => {
            let values: Vec<Value> = arr.iter().map(|v| json_to_value(v, span)).collect();
            Value::list(values, span)
        }
        serde_json::Value::Object(obj) => {
            let mut record = Record::new();
            for (k, v) in obj {
                record.push(k, json_to_value(v, span));
            }
            Value::record(record, span)
        }
    }
}

pub fn frame_to_value(frame: &Frame, span: Span, with_timestamp: bool) -> Value {
    let mut record = Record::new();

    record.push("id", Value::string(frame.id.to_string(), span));
    record.push("topic", Value::string(frame.topic.clone(), span));

    if let Some(hash) = &frame.hash {
        record.push("hash", Value::string(hash.to_string(), span));
    }

    if let Some(meta) = &frame.meta {
        record.push("meta", json_to_value(meta, span));
    }

    if let Some(ttl) = &frame.ttl {
        let ttl_str = match ttl {
            crate::store::TTL::Forever => "forever".to_string(),
            crate::store::TTL::Ephemeral => "ephemeral".to_string(),
            crate::store::TTL::Time(duration) => format!("{}s", duration.as_secs()),
            crate::store::TTL::Last(n) => format!("last:{n}"),
        };
        record.push("ttl", Value::string(ttl_str, span));
    }

    if with_timestamp {
        let millis = frame.id.timestamp() as i64;
        let dt: DateTime<Utc> = DateTime::from_timestamp_millis(millis)
            .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
        record.push("timestamp", Value::date(dt.fixed_offset(), span));
    }

    Value::record(record, span)
}

pub fn frame_to_pipeline(frame: &Frame, with_timestamp: bool) -> PipelineData {
    PipelineData::Value(frame_to_value(frame, Span::unknown(), with_timestamp), None)
}

pub fn value_to_json(value: &Value) -> serde_json::Value {
    match value {
        Value::Nothing { .. } => serde_json::Value::Null,
        Value::Bool { val, .. } => serde_json::Value::Bool(*val),
        Value::Int { val, .. } => serde_json::Value::Number((*val).into()),
        Value::Float { val, .. } => serde_json::Number::from_f64(*val)
            .map(serde_json::Value::Number)
            .unwrap_or(serde_json::Value::Null),
        Value::String { val, .. } => serde_json::Value::String(val.clone()),
        Value::Filesize { val, .. } => serde_json::Value::Number(val.get().into()),
        Value::Date { val, .. } => serde_json::Value::String(val.to_rfc3339()),
        Value::List { vals, .. } => {
            serde_json::Value::Array(vals.iter().map(value_to_json).collect())
        }
        Value::Record { val, .. } => {
            let mut map = serde_json::Map::new();
            for (k, v) in val.iter() {
                map.insert(k.clone(), value_to_json(v));
            }
            serde_json::Value::Object(map)
        }
        _ => serde_json::Value::Null,
    }
}

pub fn write_pipeline_to_cas(
    input: PipelineData,
    store: &Store,
    span: Span,
) -> Result<Option<ssri::Integrity>, Box<ShellError>> {
    let mut writer = store.cas_writer_sync().map_err(|e| {
        Box::new(ShellError::Generic(GenericError::new(
            "I/O Error",
            e.to_string(),
            span,
        )))
    })?;

    match input {
        PipelineData::Value(value, _) => match value {
            Value::Nothing { .. } => Ok(None),
            Value::String { val, .. } => {
                writer.write_all(val.as_bytes()).map_err(|e| {
                    Box::new(ShellError::Generic(GenericError::new(
                        "I/O Error",
                        e.to_string(),
                        span,
                    )))
                })?;

                let hash = writer.commit().map_err(|e| {
                    Box::new(ShellError::Generic(GenericError::new(
                        "I/O Error",
                        e.to_string(),
                        span,
                    )))
                })?;

                Ok(Some(hash))
            }
            Value::Binary { val, .. } => {
                writer.write_all(&val).map_err(|e| {
                    Box::new(ShellError::Generic(GenericError::new(
                        "I/O Error",
                        e.to_string(),
                        span,
                    )))
                })?;

                let hash = writer.commit().map_err(|e| {
                    Box::new(ShellError::Generic(GenericError::new(
                        "I/O Error",
                        e.to_string(),
                        span,
                    )))
                })?;

                Ok(Some(hash))
            }
            Value::Record { .. } => {
                let json = value_to_json(&value);
                let json_string = serde_json::to_string(&json).map_err(|e| {
                    Box::new(ShellError::Generic(GenericError::new(
                        "I/O Error",
                        e.to_string(),
                        span,
                    )))
                })?;

                writer.write_all(json_string.as_bytes()).map_err(|e| {
                    Box::new(ShellError::Generic(GenericError::new(
                        "I/O Error",
                        e.to_string(),
                        span,
                    )))
                })?;

                let hash = writer.commit().map_err(|e| {
                    Box::new(ShellError::Generic(GenericError::new(
                        "I/O Error",
                        e.to_string(),
                        span,
                    )))
                })?;

                Ok(Some(hash))
            }
            _ => Err(Box::new(ShellError::PipelineMismatch {
                exp_input_type: format!(
                    "expected: string, binary, record, or nothing :: received: {typ:?}",
                    typ = value.get_type()
                ),
                dst_span: span,
                src_span: value.span(),
            })),
        },
        PipelineData::ListStream(_stream, ..) => {
            panic!("ListStream handling is not yet implemented");
        }
        PipelineData::ByteStream(stream, ..) => {
            if let Some(mut reader) = stream.reader() {
                let mut buffer = [0; 8192];
                loop {
                    let bytes_read = reader.read(&mut buffer).map_err(|e| {
                        Box::new(ShellError::Generic(GenericError::new(
                            "I/O Error",
                            e.to_string(),
                            span,
                        )))
                    })?;

                    if bytes_read == 0 {
                        break;
                    }

                    writer.write_all(&buffer[..bytes_read]).map_err(|e| {
                        Box::new(ShellError::Generic(GenericError::new(
                            "I/O Error",
                            e.to_string(),
                            span,
                        )))
                    })?;
                }
            }

            let hash = writer.commit().map_err(|e| {
                Box::new(ShellError::Generic(GenericError::new(
                    "I/O Error",
                    e.to_string(),
                    span,
                )))
            })?;

            Ok(Some(hash))
        }
        PipelineData::Empty => Ok(None),
    }
}