use failure;
use serde;
use serde::ser::SerializeMap;
use serde_json;
use std::collections::HashMap;
use crate::{Message, Logger, MessageCompression, ChunkSize};
use crate::errors::Result;
use crate::errors::Error;
use crate::message::ChunkedMessage;
pub struct WireMessage<'a> {
host: &'a str,
message: Message<'a>,
}
impl<'a> WireMessage<'a> {
pub fn new(mut msg: Message<'a>, logger: &'a Logger) -> Self {
let additionals_from_default: HashMap<&String, &String> = logger
.default_metadata()
.iter()
.filter(|&(key, _)| !msg.metadata.contains_key(key.as_str()))
.collect();
for (key, value) in additionals_from_default {
msg.set_metadata(key.as_str(), value.as_str()).ok();
}
WireMessage {
host: logger.hostname(),
message: msg,
}
}
pub fn to_gelf(&self) -> Result<String> {
serde_json::to_string(self).map_err(|e| {
failure::Error::from(e)
.context(Error::SerializeMessageFailed)
.into()
})
}
pub fn to_compressed_gelf(&self, compression: MessageCompression) -> Result<Vec<u8>> {
compression.compress(&self)
}
pub fn to_chunked_message(
&self,
chunk_size: ChunkSize,
compression: MessageCompression,
) -> Result<ChunkedMessage> {
ChunkedMessage::new(chunk_size, self.to_compressed_gelf(compression)?)
}
}
impl<'a> serde::Serialize for WireMessage<'a> {
fn serialize<S>(&self, serializer: S) -> ::std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(None)?;
map.serialize_key("version")?;
map.serialize_value("1.1")?;
map.serialize_key("host")?;
map.serialize_value(self.host)?;
map.serialize_key("short_message")?;
map.serialize_value(&self.message.short_message())?;
map.serialize_key("level")?;
let level = self.message.level as u8;
map.serialize_value(&level)?;
if self.message.full_message().is_some() {
map.serialize_key("full_message")?;
map.serialize_value(&self.message.full_message())?;
}
if let Some(datetime) = self.message.timestamp() {
let value = datetime.timestamp_millis() as f64 / 1000.0;
map.serialize_key("timestamp")?;
map.serialize_value(&value)?;
}
for (key, value) in self.message.all_metadata().iter() {
let key = "_".to_string() + key;
map.serialize_key(&key)?;
map.serialize_value(value)?;
}
map.end()
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use crate::Level;
#[test]
fn wire_message_serialization() {
let mut message = Message::new_with_level("short", Level::Alert);
message.set_full_message("full");
let datetime = Utc.ymd(2000, 1, 1).and_hms_micro(1, 2, 3, 12_345);
message.set_timestamp(datetime);
message.set_metadata("key1", "value1").unwrap();
message.set_metadata("key2", "value2").unwrap();
let wire_msg = WireMessage {
host: "host_value",
message,
};
let json = serde_json::to_value(wire_msg).expect("Failed to serialize WireMessage");
assert_eq!(Some(json!("1.1")), json.get("version").cloned());
assert_eq!(Some(json!("host_value")), json.get("host").cloned());
assert_eq!(Some(json!("short")), json.get("short_message").cloned());
assert_eq!(Some(json!("full")), json.get("full_message").cloned());
assert_eq!(Some(json!(1)), json.get("level").cloned());
let timestamp_secs = datetime.timestamp();
let expected_timestamp = timestamp_secs as f64 + 0.012;
assert_eq!(
Some(json!(&expected_timestamp)),
json.get("timestamp").cloned()
);
assert_eq!(Some(json!("value1")), json.get("_key1").cloned());
assert_eq!(Some(json!("value2")), json.get("_key2").cloned());
}
}