use std::collections::BTreeMap;
use serde_json::Value;
use error::{Result, ErrorKind};
use super::*;
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)]
pub enum MessageKind {
#[serde(rename="A")]
AcknowledgeKind,
#[serde(rename="S")]
StreamStartedKind,
#[serde(rename="Q")]
QueryKind,
#[serde(rename="H")]
AuthKind,
#[serde(rename="T")]
TypeMetadataKind,
#[serde(rename="P")]
ProgressKind,
#[serde(rename="O")]
OutputKind,
#[serde(rename="D")]
StreamCompletedKind,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ProtoSonicMessage {
#[serde(rename="e")]
pub event_type: MessageKind,
#[serde(rename="v")]
pub variation: Option<String>,
#[serde(rename="p")]
pub payload: Option<Value>,
}
impl From<SonicMessage> for ProtoSonicMessage {
fn from(msg: SonicMessage) -> Self {
match msg {
SonicMessage::QueryMsg(Query{ config, query, auth, trace_id, .. /* ID is used only internally */ }) => {
let mut payload = BTreeMap::new();
payload.insert("config".to_owned(), config);
payload.insert("auth".to_owned(), auth.map(|s| Value::String(s))
.unwrap_or_else(|| Value::Null));
payload.insert("trace_id".to_owned(), trace_id.map(|s| Value::String(s))
.unwrap_or_else(|| Value::Null));
ProtoSonicMessage {
event_type: MessageKind::QueryKind,
variation: Some(query),
payload: Some(Value::Object(payload)),
}
},
SonicMessage::Acknowledge => {
ProtoSonicMessage {
event_type: MessageKind::AcknowledgeKind,
variation: None,
payload: None,
}
},
SonicMessage::AuthenticateMsg(Authenticate{ user, key, trace_id }) => {
let mut payload = BTreeMap::new();
payload.insert("user".to_owned(), Value::String(user));
payload.insert("trace_id".to_owned(), trace_id.map(|s| Value::String(s))
.unwrap_or_else(|| Value::Null));
ProtoSonicMessage {
event_type: MessageKind::AuthKind,
variation: Some(key),
payload: Some(Value::Object(payload)),
}
},
SonicMessage::TypeMetadata(meta) => {
ProtoSonicMessage {
event_type: MessageKind::TypeMetadataKind,
variation: None,
payload: Some(::serde_json::to_value(&meta)),
}
},
SonicMessage::QueryProgress{ progress, total, units, .. } => {
let mut payload = BTreeMap::new();
payload.insert("p".to_owned(), Value::F64(progress));
units.map(|u| payload.insert("u".to_owned(), Value::String(u)));
total.map(|t| payload.insert("t".to_owned(), Value::F64(t)));
ProtoSonicMessage {
event_type: MessageKind::ProgressKind,
variation: None,
payload: Some(Value::Object(payload)),
}
},
SonicMessage::OutputChunk(data) => {
ProtoSonicMessage {
event_type: MessageKind::OutputKind,
variation: None,
payload: Some(Value::Array(data)),
}
},
SonicMessage::StreamStarted(trace_id) => {
ProtoSonicMessage {
event_type: MessageKind::StreamStartedKind,
variation: Some(trace_id),
payload: None,
}
},
SonicMessage::StreamCompleted(res, trace_id) => {
let mut payload = BTreeMap::new();
payload.insert("trace_id".to_owned(), Value::String(trace_id));
ProtoSonicMessage {
event_type: MessageKind::StreamCompletedKind,
variation: res,
payload: Some(Value::Object(payload)),
}
}
}
}
}
fn get_payload(payload: Option<Value>) -> Result<BTreeMap<String, Value>> {
match payload {
Some(Value::Object(p)) => Ok(p),
_ => try!(Err(ErrorKind::Proto("msg payload is empty".to_owned()))),
}
}
impl ProtoSonicMessage {
pub fn into_msg(self) -> Result<SonicMessage> {
let ProtoSonicMessage { event_type, variation, payload } = self;
match event_type {
MessageKind::AcknowledgeKind => Ok(SonicMessage::Acknowledge),
MessageKind::AuthKind => {
let payload = try!(get_payload(payload));
let user = try!(payload.get("user")
.and_then(|s| s.as_str().map(|s| s.to_owned()))
.ok_or_else(|| ErrorKind::Proto("missing user field in payload".to_owned())));
let key = try!(payload.get("key")
.and_then(|s| s.as_str().map(|s| s.to_owned()))
.ok_or_else(|| ErrorKind::Proto("missing key field in payload".to_owned())));
let trace_id = payload.get("trace_id").and_then(|s| s.as_str().map(|s| s.to_owned()));
Ok(SonicMessage::AuthenticateMsg(Authenticate {
user: user,
key: key,
trace_id: trace_id,
}))
},
MessageKind::TypeMetadataKind => {
let payload = try!(payload
.ok_or_else(|| ErrorKind::Proto("msg payload is empty".to_owned())));
let data = try!(::serde_json::from_value(payload));
Ok(SonicMessage::TypeMetadata(data))
},
MessageKind::ProgressKind => {
let payload = try!(get_payload(payload));
let total = payload.get("t").and_then(|s| s.as_f64());
let js = try!(payload.get("s")
.ok_or_else(|| ErrorKind::Proto("missing query status in payload".to_owned())));
let status = match js {
&Value::U64(0) => QueryStatus::Queued,
&Value::U64(1) => QueryStatus::Started,
&Value::U64(2) => QueryStatus::Running,
&Value::U64(3) => QueryStatus::Waiting,
&Value::U64(4) => QueryStatus::Finished,
s => {
return Err(ErrorKind::Proto(format!("unexpected query status {:?}", s)).into());
}
};
let progress = try!(payload.get("p")
.and_then(|s| s.as_f64())
.ok_or_else(|| ErrorKind::Proto("progress not found in payload".to_owned())));
let units = payload.get("u").and_then(|s| s.as_str().map(|s| s.to_owned()));
Ok(SonicMessage::QueryProgress {
progress: progress,
status: status,
total: total,
units: units,
})
},
MessageKind::OutputKind => {
match payload {
Some(Value::Array(data)) => Ok(SonicMessage::OutputChunk(data)),
s => try!(Err(ErrorKind::Proto(format!("payload is not an array: {:?}", s)))),
}
},
MessageKind::StreamStartedKind => {
let trace_id = try!(variation
.ok_or_else(|| ErrorKind::Proto("StreamStarted 'variation' field is empty".to_owned())));
Ok(SonicMessage::StreamStarted(trace_id))
},
MessageKind::StreamCompletedKind => {
let payload = try!(get_payload(payload));
let trace_id = try!(payload.get("trace_id")
.and_then(|s| s.as_str().map(|s| s.to_owned()))
.ok_or_else(|| ErrorKind::Proto("missing 'trace_id' in payload".to_owned())));
Ok(SonicMessage::StreamCompleted(variation, trace_id))
},
MessageKind::QueryKind => {
let payload = try!(get_payload(payload));
let trace_id = payload.get("trace_id")
.and_then(|t| t.as_str().map(|t| t.to_owned()));
let auth_token = payload.get("auth")
.and_then(|a| a.as_str().map(|a| a.to_owned()));
let query = try!(variation
.ok_or_else(|| ErrorKind::Proto("msg variation is empty".to_owned())));
let config = try!(payload.get("config")
.map(|c| c.to_owned())
.ok_or_else(|| {
ErrorKind::Proto("missing 'config' in query message payload".to_owned())
}));
Ok(SonicMessage::QueryMsg(Query{
id: None,
trace_id: trace_id,
query: query,
auth: auth_token,
config: config,
}))
},
}
}
}