sonic 0.6.1

client library for Sonicd, a data streaming gateway
use std::collections::BTreeMap;
use serde_json::Value;
use error::{Result, ErrorKind};
use super::*;

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)]
pub enum MessageKind {
    #[serde(rename="A")]
    AcknowledgeKind,
    #[serde(rename="S")]
    StreamStartedKind,
    #[serde(rename="Q")]
    QueryKind,
    #[serde(rename="H")]
    AuthKind,
    #[serde(rename="T")]
    TypeMetadataKind,
    #[serde(rename="P")]
    ProgressKind,
    #[serde(rename="O")]
    OutputKind,
    #[serde(rename="D")]
    StreamCompletedKind,
}


#[derive(Serialize, Deserialize, Debug)]
pub struct ProtoSonicMessage {
    #[serde(rename="e")]
    pub event_type: MessageKind,
    #[serde(rename="v")]
    pub variation: Option<String>,
    #[serde(rename="p")]
    pub payload: Option<Value>,
}

impl From<SonicMessage> for ProtoSonicMessage {
    fn from(msg: SonicMessage) -> Self {
        match msg {
            SonicMessage::QueryMsg(Query{ config, query, auth, trace_id, .. /* ID is used only internally */ }) => {
                let mut payload = BTreeMap::new();

                payload.insert("config".to_owned(), config);

                payload.insert("auth".to_owned(), auth.map(|s| Value::String(s))
                               .unwrap_or_else(|| Value::Null));

                payload.insert("trace_id".to_owned(), trace_id.map(|s| Value::String(s))
                               .unwrap_or_else(|| Value::Null));

                ProtoSonicMessage {
                    event_type: MessageKind::QueryKind,
                    variation: Some(query),
                    payload: Some(Value::Object(payload)),
                }
            },
            SonicMessage::Acknowledge => {
                ProtoSonicMessage {
                    event_type: MessageKind::AcknowledgeKind,
                    variation: None,
                    payload: None,
                }
            },
            SonicMessage::AuthenticateMsg(Authenticate{ user, key, trace_id }) => {
                let mut payload = BTreeMap::new();

                payload.insert("user".to_owned(), Value::String(user));

                payload.insert("trace_id".to_owned(), trace_id.map(|s| Value::String(s))
                               .unwrap_or_else(|| Value::Null));

                ProtoSonicMessage {
                    event_type: MessageKind::AuthKind,
                    variation: Some(key),
                    payload: Some(Value::Object(payload)),
                }
            },
            SonicMessage::TypeMetadata(meta) => {
                ProtoSonicMessage {
                    event_type: MessageKind::TypeMetadataKind,
                    variation: None,
                    payload: Some(::serde_json::to_value(&meta)),
                }
            },
            SonicMessage::QueryProgress{ progress, total, units, .. } => {
                let mut payload = BTreeMap::new();

                payload.insert("p".to_owned(), Value::F64(progress));

                units.map(|u| payload.insert("u".to_owned(), Value::String(u)));
                total.map(|t| payload.insert("t".to_owned(), Value::F64(t)));

                ProtoSonicMessage {
                    event_type: MessageKind::ProgressKind,
                    variation: None,
                    payload: Some(Value::Object(payload)),
                }
            },
            SonicMessage::OutputChunk(data) => {
                ProtoSonicMessage {
                    event_type: MessageKind::OutputKind,
                    variation: None,
                    payload: Some(Value::Array(data)),
                }
            },
            SonicMessage::StreamStarted(trace_id) => {
                ProtoSonicMessage {
                    event_type: MessageKind::StreamStartedKind,
                    variation: Some(trace_id),
                    payload: None,
                }
            },
            SonicMessage::StreamCompleted(res, trace_id) => {
                let mut payload = BTreeMap::new();

                payload.insert("trace_id".to_owned(), Value::String(trace_id));
                ProtoSonicMessage {
                    event_type: MessageKind::StreamCompletedKind,
                    variation: res,
                    payload: Some(Value::Object(payload)),
                }
            }
        }
    }
}

fn get_payload(payload: Option<Value>) -> Result<BTreeMap<String, Value>> {
    match payload {
        Some(Value::Object(p)) => Ok(p),
        _ => try!(Err(ErrorKind::Proto("msg payload is empty".to_owned()))),
    }
}

impl ProtoSonicMessage {
    pub fn into_msg(self) -> Result<SonicMessage> {
        let ProtoSonicMessage { event_type, variation, payload } = self;

        match event_type {
            MessageKind::AcknowledgeKind => Ok(SonicMessage::Acknowledge),
            MessageKind::AuthKind => {
                let payload = try!(get_payload(payload));
                let user = try!(payload.get("user")
                                .and_then(|s| s.as_str().map(|s| s.to_owned()))
                                .ok_or_else(|| ErrorKind::Proto("missing user field in payload".to_owned())));
                let key = try!(payload.get("key")
                               .and_then(|s| s.as_str().map(|s| s.to_owned()))
                               .ok_or_else(|| ErrorKind::Proto("missing key field in payload".to_owned())));
                let trace_id = payload.get("trace_id").and_then(|s| s.as_str().map(|s| s.to_owned()));

                Ok(SonicMessage::AuthenticateMsg(Authenticate {
                    user: user,
                    key: key,
                    trace_id: trace_id,
                }))
            },
            MessageKind::TypeMetadataKind => {
                let payload = try!(payload
                                   .ok_or_else(|| ErrorKind::Proto("msg payload is empty".to_owned())));

                let data = try!(::serde_json::from_value(payload));
                Ok(SonicMessage::TypeMetadata(data))
            },
            MessageKind::ProgressKind => {
                let payload = try!(get_payload(payload));

                let total = payload.get("t").and_then(|s| s.as_f64());

                let js = try!(payload.get("s")
                              .ok_or_else(|| ErrorKind::Proto("missing query status in payload".to_owned())));

                let status = match js {
                    &Value::U64(0) => QueryStatus::Queued,
                    &Value::U64(1) => QueryStatus::Started,
                    &Value::U64(2) => QueryStatus::Running,
                    &Value::U64(3) => QueryStatus::Waiting,
                    &Value::U64(4) => QueryStatus::Finished,
                    s => {
                        return Err(ErrorKind::Proto(format!("unexpected query status {:?}", s)).into());
                    }
                };

                let progress = try!(payload.get("p")
                                    .and_then(|s| s.as_f64())
                                    .ok_or_else(|| ErrorKind::Proto("progress not found in payload".to_owned())));

                let units = payload.get("u").and_then(|s| s.as_str().map(|s| s.to_owned()));

                Ok(SonicMessage::QueryProgress {
                    progress: progress,
                    status: status,
                    total: total,
                    units: units,
                })
            },
            MessageKind::OutputKind => {
                match payload {
                    Some(Value::Array(data)) => Ok(SonicMessage::OutputChunk(data)),
                    s => try!(Err(ErrorKind::Proto(format!("payload is not an array: {:?}", s)))),
                }
            },
            MessageKind::StreamStartedKind => {
                let trace_id = try!(variation
                                 .ok_or_else(|| ErrorKind::Proto("StreamStarted 'variation' field is empty".to_owned())));

                Ok(SonicMessage::StreamStarted(trace_id))
            },
            MessageKind::StreamCompletedKind => {
                let payload = try!(get_payload(payload));

                let trace_id = try!(payload.get("trace_id")
                    .and_then(|s| s.as_str().map(|s| s.to_owned()))
                    .ok_or_else(|| ErrorKind::Proto("missing 'trace_id' in payload".to_owned())));

                Ok(SonicMessage::StreamCompleted(variation, trace_id))
            },
            MessageKind::QueryKind => {
                let payload = try!(get_payload(payload));

                let trace_id = payload.get("trace_id")
                    .and_then(|t| t.as_str().map(|t| t.to_owned()));

                let auth_token = payload.get("auth")
                    .and_then(|a| a.as_str().map(|a| a.to_owned()));

                let query = try!(variation
                                 .ok_or_else(|| ErrorKind::Proto("msg variation is empty".to_owned())));

                let config = try!(payload.get("config")
                                  .map(|c| c.to_owned())
                                  .ok_or_else(|| {
                                      ErrorKind::Proto("missing 'config' in query message payload".to_owned())
                                  }));

                Ok(SonicMessage::QueryMsg(Query{
                    id: None,
                    trace_id: trace_id,
                    query: query,
                    auth: auth_token,
                    config: config,
                }))
            },
        }
    }
}