1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
//! Send record as msgpack.
//!
//! ## Usage
//!
//! This trait is used as follows:
//!
//! ```no_run
//! extern crate fruently;
//! use fruently::fluent::Fluent;
//! use std::collections::HashMap;
//! use fruently::forwardable::MsgpackForwardable;
//!
//! fn main() {
//!     let mut obj: HashMap<String, String> = HashMap::new();
//!     obj.insert("name".to_string(), "fruently".to_string());
//!     let fruently = Fluent::new("0.0.0.0:24224", "test");
//!     let _ = fruently.post(&obj);
//! }
//! ```

use std::net::ToSocketAddrs;
use rustc_serialize::Encodable;
use time;
use retry::retry_exponentially;
use record::Record;
use record::FluentError;
use forwardable::MsgpackForwardable;
use fluent::Fluent;

#[derive(Debug, Clone, PartialEq, Eq, RustcEncodable, RustcDecodable)]
pub struct Message<T: Encodable> {
    tag: String,
    timesec: i64,
    record: T,
}

impl<T: Encodable> Message<T> {
    pub fn new(tag: String, timesec: i64, record: T) -> Message<T> {
        Message {
            tag: tag,
            timesec: timesec,
            record: record,
        }
    }
}

impl<A: ToSocketAddrs> MsgpackForwardable for Fluent<A> {
    /// Post record into Fluentd. Without time version.
    fn post<T>(self, record: T) -> Result<(), FluentError>
        where T: Encodable
    {
        let time = time::now();
        return self.post_with_time(record, time);
    }

    /// Post record into Fluentd. With time version.
    fn post_with_time<T>(self, record: T, time: time::Tm) -> Result<(), FluentError>
        where T: Encodable
    {
        let record = Record::new(self.get_tag(), time, record);
        let message = record.to_message();
        let addr = self.get_addr();
        let (max, multiplier) = self.get_conf().build();
        match retry_exponentially(max,
                                  multiplier,
                                  || Fluent::closure_send_as_msgpack(addr, &message),
                                  |response| response.is_ok()) {
            Ok(_) => Ok(()),
            Err(v) => Err(From::from(v)),
        }
    }
}

#[cfg(test)]
mod tests {
    #[cfg(feature="fluentd")]
    use time;
    #[cfg(feature="fluentd")]
    use fluent::Fluent;

    #[test]
    #[cfg(feature="fluentd")]
    fn test_post() {
        use std::collections::HashMap;
        use forwardable::MsgpackForwardable;

        let fruently = Fluent::new("0.0.0.0:24224", "test");
        let mut obj: HashMap<String, String> = HashMap::new();
        obj.insert("hey".to_string(), "Rust with msgpack!".to_string());
        let result = fruently.post(obj).is_ok();
        assert_eq!(true, result);
    }

    #[test]
    #[cfg(feature="fluentd")]
    fn test_post_with_time() {
        use std::collections::HashMap;
        use forwardable::MsgpackForwardable;

        let fruently = Fluent::new("0.0.0.0:24224", "test");
        let mut obj: HashMap<String, String> = HashMap::new();
        obj.insert("hey".to_string(), "Rust with msgpack!".to_string());
        let time = time::now();
        let result = fruently.post_with_time(obj, time).is_ok();
        assert_eq!(true, result);
    }
}