use std::io::Read;
use std::io::Write;
use chrono::{DateTime, Utc};
use nu_protocol::shell_error::generic::GenericError;
use nu_protocol::{PipelineData, Record, ShellError, Span, Value};
use crate::store::Frame;
use crate::store::Store;
pub fn json_to_value(json: &serde_json::Value, span: Span) -> Value {
match json {
serde_json::Value::Null => Value::nothing(span),
serde_json::Value::Bool(b) => Value::bool(*b, span),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::int(i, span)
} else if let Some(f) = n.as_f64() {
Value::float(f, span)
} else {
Value::string(n.to_string(), span)
}
}
serde_json::Value::String(s) => Value::string(s, span),
serde_json::Value::Array(arr) => {
let values: Vec<Value> = arr.iter().map(|v| json_to_value(v, span)).collect();
Value::list(values, span)
}
serde_json::Value::Object(obj) => {
let mut record = Record::new();
for (k, v) in obj {
record.push(k, json_to_value(v, span));
}
Value::record(record, span)
}
}
}
pub fn frame_to_value(frame: &Frame, span: Span, with_timestamp: bool) -> Value {
let mut record = Record::new();
record.push("id", Value::string(frame.id.to_string(), span));
record.push("topic", Value::string(frame.topic.clone(), span));
if let Some(hash) = &frame.hash {
record.push("hash", Value::string(hash.to_string(), span));
}
if let Some(meta) = &frame.meta {
record.push("meta", json_to_value(meta, span));
}
if let Some(ttl) = &frame.ttl {
let ttl_str = match ttl {
crate::store::TTL::Forever => "forever".to_string(),
crate::store::TTL::Ephemeral => "ephemeral".to_string(),
crate::store::TTL::Time(duration) => format!("{}s", duration.as_secs()),
crate::store::TTL::Last(n) => format!("last:{n}"),
};
record.push("ttl", Value::string(ttl_str, span));
}
if with_timestamp {
let millis = frame.id.timestamp() as i64;
let dt: DateTime<Utc> = DateTime::from_timestamp_millis(millis)
.unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
record.push("timestamp", Value::date(dt.fixed_offset(), span));
}
Value::record(record, span)
}
pub fn frame_to_pipeline(frame: &Frame, with_timestamp: bool) -> PipelineData {
PipelineData::Value(frame_to_value(frame, Span::unknown(), with_timestamp), None)
}
pub fn value_to_json(value: &Value) -> serde_json::Value {
match value {
Value::Nothing { .. } => serde_json::Value::Null,
Value::Bool { val, .. } => serde_json::Value::Bool(*val),
Value::Int { val, .. } => serde_json::Value::Number((*val).into()),
Value::Float { val, .. } => serde_json::Number::from_f64(*val)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
Value::String { val, .. } => serde_json::Value::String(val.clone()),
Value::Filesize { val, .. } => serde_json::Value::Number(val.get().into()),
Value::Date { val, .. } => serde_json::Value::String(val.to_rfc3339()),
Value::List { vals, .. } => {
serde_json::Value::Array(vals.iter().map(value_to_json).collect())
}
Value::Record { val, .. } => {
let mut map = serde_json::Map::new();
for (k, v) in val.iter() {
map.insert(k.clone(), value_to_json(v));
}
serde_json::Value::Object(map)
}
_ => serde_json::Value::Null,
}
}
pub fn write_pipeline_to_cas(
input: PipelineData,
store: &Store,
span: Span,
) -> Result<Option<ssri::Integrity>, Box<ShellError>> {
let mut writer = store.cas_writer_sync().map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
match input {
PipelineData::Value(value, _) => match value {
Value::Nothing { .. } => Ok(None),
Value::String { val, .. } => {
writer.write_all(val.as_bytes()).map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
let hash = writer.commit().map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
Ok(Some(hash))
}
Value::Binary { val, .. } => {
writer.write_all(&val).map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
let hash = writer.commit().map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
Ok(Some(hash))
}
Value::Record { .. } => {
let json = value_to_json(&value);
let json_string = serde_json::to_string(&json).map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
writer.write_all(json_string.as_bytes()).map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
let hash = writer.commit().map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
Ok(Some(hash))
}
_ => Err(Box::new(ShellError::PipelineMismatch {
exp_input_type: format!(
"expected: string, binary, record, or nothing :: received: {typ:?}",
typ = value.get_type()
),
dst_span: span,
src_span: value.span(),
})),
},
PipelineData::ListStream(_stream, ..) => {
panic!("ListStream handling is not yet implemented");
}
PipelineData::ByteStream(stream, ..) => {
if let Some(mut reader) = stream.reader() {
let mut buffer = [0; 8192];
loop {
let bytes_read = reader.read(&mut buffer).map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
if bytes_read == 0 {
break;
}
writer.write_all(&buffer[..bytes_read]).map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
}
}
let hash = writer.commit().map_err(|e| {
Box::new(ShellError::Generic(GenericError::new(
"I/O Error",
e.to_string(),
span,
)))
})?;
Ok(Some(hash))
}
PipelineData::Empty => Ok(None),
}
}