1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//! The library allows to send data to carbon (also known as graphite)
//!
//! #Example
//!
//! ```ignore
//! extern crate rotor;
//! extern crate rotor_tools;
//!
//! use rotor_tools::loop_ext::LoopExt;
//!
//! let loopinst = rotor::Loop::new(&rotor::Config::new()).instantiate(());
//! let sink = loopinst.add_and_fetch(|scope| {
//!     connect_ip("127.0.0.1:2003".parse().unwrap(), scope)
//! }).unwrap();
//!
//! loopinst.run().unwrap();
//!
//! // Then somewhere else:
//!
//! { // Note sender keeps lock on buffer for it's lifetime
//!     let sender = sink.sender();
//!     sender.add_u64("some.value", 1234);
//! }
//! ```
//!
//! If you need to format the metric name, use `format_args!` instead of
//! `format!` as the former does not preallocate a buffer:
//! ```
//! snd.add_int_at(
//!     format_args!("servers.{}.metrics.{}", hostname, metric),
//!     metric_value, timestamp);
//! ```
//!
extern crate rotor;
extern crate rotor_stream;
extern crate rotor_tools;
extern crate time;
extern crate num;
#[macro_use] extern crate log;

mod sink;
mod sender;
mod proto;

use std::net::SocketAddr;
use std::sync::{Arc, Mutex, MutexGuard};

use rotor::{GenericScope, Notifier, Void, Response};
use rotor::mio::tcp::TcpStream;
use rotor_stream::{Persistent, ActiveStream};
use rotor_tools::sync::{Mutexed};


/// A state machine object, just add in to the loop
pub type Fsm<C, S> = Mutexed<Persistent<proto::CarbonProto<C, S>>>;

/// This is a wrapper around the machinery to send data
///
/// Use ``sink.sender()`` go to get an actual object you may send to
///
/// Note ``sink.sender()`` holds lock on the underlying buffer and doesn't
/// send data, until sender is dropped. This is useful for sending data in
/// single bulk.
#[derive(Clone)]
pub struct Sink<C, S>(Arc<Mutex<Persistent<proto::CarbonProto<C, S>>>>,
                      Notifier)
    where S: ActiveStream;

/// The sender object, which has convenience methods to send the data
///
/// Note ``Sender()`` holds lock on the underlying buffer and doesn't
/// send data, until sender is dropped. This is useful for sending data in
/// single bulk.
pub struct Sender<'a, C: 'a, S>(
    MutexGuard<'a, Persistent<proto::CarbonProto<C, S>>>,
    Option<Notifier>)
    where S: ActiveStream;

/// Connect to the socket by IP address
///
/// The method is here while rotor-dns is not matured yet. The better way
/// would be to use dns resolving.
pub fn connect_ip<S: GenericScope, C>(addr: SocketAddr, scope: &mut S)
    -> Response<(Fsm<C, TcpStream>, Sink<C, TcpStream>), Void>
{
    Persistent::connect(scope, addr, ()).wrap(|fsm| {
        let arc = Arc::new(Mutex::new(fsm));
        (Mutexed(arc.clone()), Sink(arc, scope.notifier()))
    })
}