datadog_statsd/
client.rs

1use std::collections::VecDeque;
2use std::error;
3use std::fmt;
4use std::io::Error;
5use std::net::AddrParseError;
6use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
7use std::time;
8
9extern crate rand;
10
11#[derive(Debug)]
12pub enum StatsdError {
13    IoError(Error),
14    AddrParseError(String),
15}
16
17impl From<AddrParseError> for StatsdError {
18    fn from(_: AddrParseError) -> StatsdError {
19        StatsdError::AddrParseError("Address parsing error".to_string())
20    }
21}
22
23impl From<Error> for StatsdError {
24    fn from(err: Error) -> StatsdError {
25        StatsdError::IoError(err)
26    }
27}
28
29impl fmt::Display for StatsdError {
30    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31        match *self {
32            StatsdError::IoError(ref e) => write!(f, "{}", e),
33            StatsdError::AddrParseError(ref e) => write!(f, "{}", e),
34        }
35    }
36}
37
38impl error::Error for StatsdError {}
39
40/// Client socket for statsd servers.
41///
42/// After creating a metric you can use `Client`
43/// to send metrics to the configured statsd server
44///
45/// # Example
46///
47/// Creating a client and sending metrics is easy.
48///
49/// ```ignore
50/// use datadog_statsd::client::Client;
51///
52/// let client = Client::new("127.0.0.1:8125", "myapp", tags);
53/// client.incr("some.metric.completed");
54/// ```
55pub struct Client {
56    socket: UdpSocket,
57    server_address: SocketAddr,
58    prefix: String,
59    constant_tags: Vec<String>,
60}
61
62impl Client {
63    /// Construct a new statsd client given an host/port & prefix
64    pub fn new<T: ToSocketAddrs>(
65        host: T,
66        prefix: &str,
67        constant_tags: Option<Vec<&str>>,
68    ) -> Result<Client, StatsdError> {
69        let server_address = host
70            .to_socket_addrs()?
71            .next()
72            .ok_or_else(|| StatsdError::AddrParseError("Address parsing error".to_string()))?;
73
74        // Bind to a generic port as we'll only be writing on this
75        // socket.
76        let socket = if server_address.is_ipv4() {
77            UdpSocket::bind("0.0.0.0:0")?
78        } else {
79            UdpSocket::bind("[::]:0")?
80        };
81        Ok(Client {
82            socket,
83            prefix: prefix.to_string(),
84            server_address,
85            constant_tags: match constant_tags {
86                Some(tags) => tags.iter().map(|x| x.to_string()).collect(),
87                None => vec![],
88            },
89        })
90    }
91
92    /// Increment a metric by 1
93    ///
94    /// ```ignore
95    /// # Increment a given metric by 1.
96    /// client.incr("metric.completed", tags);
97    /// ```
98    ///
99    /// This modifies a counter with an effective sampling
100    /// rate of 1.0.
101    pub fn incr(&self, metric: &str, tags: &Option<Vec<&str>>) {
102        self.count(metric, 1.0, tags);
103    }
104
105    /// Decrement a metric by -1
106    ///
107    /// ```ignore
108    /// # Decrement a given metric by 1
109    /// client.decr("metric.completed", tags);
110    /// ```
111    ///
112    /// This modifies a counter with an effective sampling
113    /// rate of 1.0.
114    pub fn decr(&self, metric: &str, tags: &Option<Vec<&str>>) {
115        self.count(metric, -1.0, tags);
116    }
117
118    /// Modify a counter by `value`.
119    ///
120    /// Will increment or decrement a counter by `value` with
121    /// a sampling rate of 1.0.
122    ///
123    /// ```ignore
124    /// // Increment by 12
125    /// client.count("metric.completed", 12.0, tags);
126    /// ```
127    pub fn count(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
128        let data = self.prepare_with_tags(format!("{}:{}|c", metric, value), tags);
129        self.send(data);
130    }
131
132    /// Modify a counter by `value` only x% of the time.
133    ///
134    /// Will increment or decrement a counter by `value` with
135    /// a custom sampling rate.
136    ///
137    ///
138    /// ```ignore
139    /// // Increment by 4 50% of the time.
140    /// client.sampled_count("metric.completed", 4, 0.5, tags);
141    /// ```
142    pub fn sampled_count(&self, metric: &str, value: f64, rate: f64, tags: &Option<Vec<&str>>) {
143        if rand::random::<f64>() >= rate {
144            return;
145        }
146        let data = self.prepare_with_tags(format!("{}:{}|c|@{}", metric, value, rate), tags);
147        self.send(data);
148    }
149
150    /// Set a gauge value.
151    ///
152    /// ```ignore
153    /// // set a gauge to 9001
154    /// client.gauge("power_level.observed", 9001.0, tags);
155    /// ```
156    pub fn gauge(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
157        let data = self.prepare_with_tags(format!("{}:{}|g", metric, value), tags);
158        self.send(data);
159    }
160
161    /// Send a timer value.
162    ///
163    /// The value is expected to be in ms.
164    ///
165    /// ```ignore
166    /// // pass a duration value
167    /// client.timer("response.duration", 10.123, tags);
168    /// ```
169    pub fn timer(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
170        let data = self.prepare_with_tags(format!("{}:{}|ms", metric, value), tags);
171        self.send(data);
172    }
173
174    /// Time a block of code.
175    ///
176    /// The passed closure will be timed and executed. The block's
177    /// duration will be sent as a metric.
178    ///
179    /// ```ignore
180    /// // pass a duration value
181    /// client.time("response.duration", tags, || {
182    ///   // Your code here.
183    /// });
184    /// ```
185    pub fn time<F, R>(&self, metric: &str, tags: &Option<Vec<&str>>, callable: F) -> R
186    where
187        F: FnOnce() -> R,
188    {
189        let start = time::Instant::now();
190        let return_val = callable();
191        let used = start.elapsed();
192        let data = self.prepare_with_tags(format!("{}:{}|ms", metric, used.as_millis()), tags);
193        self.send(data);
194        return_val
195    }
196
197    fn prepare<T: AsRef<str>>(&self, data: T) -> String {
198        if self.prefix.is_empty() {
199            data.as_ref().to_string()
200        } else {
201            format!("{}.{}", self.prefix, data.as_ref())
202        }
203    }
204
205    fn prepare_with_tags<T: AsRef<str>>(&self, data: T, tags: &Option<Vec<&str>>) -> String {
206        self.append_tags(self.prepare(data), tags)
207    }
208
209    fn append_tags<T: AsRef<str>>(&self, data: T, tags: &Option<Vec<&str>>) -> String {
210        if self.constant_tags.is_empty() && tags.is_none() {
211            data.as_ref().to_string()
212        } else {
213            let mut all_tags = self.constant_tags.clone();
214            match tags {
215                Some(v) => {
216                    for tag in v {
217                        all_tags.push(tag.to_string());
218                    }
219                }
220                None => {
221                    // nothing to do
222                }
223            }
224            format!("{}|#{}", data.as_ref(), all_tags.join(","))
225        }
226    }
227
228    /// Send data along the UDP socket.
229    fn send(&self, data: String) {
230        let _ = self.socket.send_to(data.as_bytes(), self.server_address);
231    }
232
233    /// Get a pipeline struct that allows optimizes the number of UDP
234    /// packets used to send multiple metrics
235    ///
236    /// ```ignore
237    /// let mut pipeline = client.pipeline();
238    /// pipeline.incr("some.metric", 1);
239    /// pipeline.incr("other.metric", 1);
240    /// pipeline.send(&mut client);
241    /// ```
242    pub fn pipeline(&self) -> Pipeline {
243        Pipeline::new()
244    }
245
246    /// Send a histogram value.
247    ///
248    /// ```ignore
249    /// // pass response size value
250    /// client.histogram("response.size", 128.0, tags);
251    /// ```
252    pub fn histogram(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
253        let data = self.prepare_with_tags(format!("{}:{}|h", metric, value), tags);
254        self.send(data);
255    }
256
257    /// Send a event.
258    ///
259    /// ```ignore
260    /// // pass a app start event
261    /// client.event("MyApp Start", "MyApp Details", AlertType::Info, &Some(vec!["tag1", "tag2:test"]));
262    /// ```
263    pub fn event(&self, title: &str, text: &str, alert_type: AlertType, tags: &Option<Vec<&str>>) {
264        let mut d = vec![];
265        d.push(format!("_e{{{},{}}}:{}", title.len(), text.len(), title));
266        d.push(text.to_string());
267        if alert_type != AlertType::Info {
268            d.push(format!("t:{}", alert_type.to_string().to_lowercase()))
269        }
270        let event_with_tags = self.append_tags(d.join("|"), tags);
271        self.send(event_with_tags)
272    }
273
274    /// Send a service check.
275    ///
276    /// ```ignore
277    /// // pass a app status
278    /// client.service_check("MyApp", ServiceCheckStatus::Ok, &Some(vec!["tag1", "tag2:test"]));
279    /// ```
280    pub fn service_check(
281        &self,
282        service_check_name: &str,
283        status: ServiceCheckStatus,
284        tags: &Option<Vec<&str>>,
285    ) {
286        let mut d = vec![];
287        let status_code = (status as u32).to_string();
288        d.push("_sc");
289        d.push(service_check_name);
290        d.push(&status_code);
291        let sc_with_tags = self.append_tags(d.join("|"), tags);
292        self.send(sc_with_tags)
293    }
294}
295
296#[derive(Clone, Debug, PartialEq, Eq)]
297pub enum AlertType {
298    Info,
299    Error,
300    Warning,
301    Success,
302}
303
304impl fmt::Display for AlertType {
305    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
306        write!(f, "{:?}", self)
307    }
308}
309
310#[derive(Clone, Debug, PartialEq, Eq)]
311pub enum ServiceCheckStatus {
312    Ok = 0,
313    Warning = 1,
314    Critical = 2,
315    Unknown = 3,
316}
317
318pub struct Pipeline {
319    stats: VecDeque<String>,
320    max_udp_size: usize,
321}
322
323impl Default for Pipeline {
324    fn default() -> Self {
325        Self::new()
326    }
327}
328
329impl Pipeline {
330    pub fn new() -> Pipeline {
331        Pipeline {
332            stats: VecDeque::new(),
333            max_udp_size: 512,
334        }
335    }
336
337    /// Set max UDP packet size
338    ///
339    /// ```
340    /// use datadog_statsd::client::Pipeline;
341    ///
342    /// let mut pipe = Pipeline::new();
343    /// pipe.set_max_udp_size(128);
344    /// ```
345    pub fn set_max_udp_size(&mut self, max_udp_size: usize) {
346        self.max_udp_size = max_udp_size;
347    }
348
349    /// Increment a metric by 1
350    ///
351    /// ```
352    /// use datadog_statsd::client::Pipeline;
353    ///
354    /// let mut pipe = Pipeline::new();
355    /// // Increment a given metric by 1.
356    /// pipe.incr("metric.completed");
357    /// ```
358    ///
359    /// This modifies a counter with an effective sampling
360    /// rate of 1.0.
361    pub fn incr(&mut self, metric: &str) {
362        self.count(metric, 1.0);
363    }
364
365    /// Decrement a metric by -1
366    ///
367    /// ```
368    /// use datadog_statsd::client::Pipeline;
369    ///
370    /// let mut pipe = Pipeline::new();
371    /// // Decrement a given metric by 1
372    /// pipe.decr("metric.completed");
373    /// ```
374    ///
375    /// This modifies a counter with an effective sampling
376    /// rate of 1.0.
377    pub fn decr(&mut self, metric: &str) {
378        self.count(metric, -1.0);
379    }
380
381    /// Modify a counter by `value`.
382    ///
383    /// Will increment or decrement a counter by `value` with
384    /// a sampling rate of 1.0.
385    ///
386    /// ```
387    /// use datadog_statsd::client::Pipeline;
388    ///
389    /// let mut pipe = Pipeline::new();
390    /// // Increment by 12
391    /// pipe.count("metric.completed", 12.0);
392    /// ```
393    pub fn count(&mut self, metric: &str, value: f64) {
394        let data = format!("{}:{}|c", metric, value);
395        self.stats.push_back(data);
396    }
397
398    /// Modify a counter by `value` only x% of the time.
399    ///
400    /// Will increment or decrement a counter by `value` with
401    /// a custom sampling rate.
402    ///
403    /// ```
404    /// use datadog_statsd::client::Pipeline;
405    ///
406    /// let mut pipe = Pipeline::new();
407    /// // Increment by 4 50% of the time.
408    /// pipe.sampled_count("metric.completed", 4.0, 0.5);
409    /// ```
410    pub fn sampled_count(&mut self, metric: &str, value: f64, rate: f64) {
411        if rand::random::<f64>() >= rate {
412            return;
413        }
414        let data = format!("{}:{}|c|@{}", metric, value, rate);
415        self.stats.push_back(data);
416    }
417
418    /// Set a gauge value.
419    ///
420    /// ```
421    /// use datadog_statsd::client::Pipeline;
422    ///
423    /// let mut pipe = Pipeline::new();
424    /// // set a gauge to 9001
425    /// pipe.gauge("power_level.observed", 9001.0);
426    /// ```
427    pub fn gauge(&mut self, metric: &str, value: f64) {
428        let data = format!("{}:{}|g", metric, value);
429        self.stats.push_back(data);
430    }
431
432    /// Send a timer value.
433    ///
434    /// The value is expected to be in ms.
435    ///
436    /// ```
437    /// use datadog_statsd::client::Pipeline;
438    ///
439    /// let mut pipe = Pipeline::new();
440    /// // pass a duration value
441    /// pipe.timer("response.duration", 10.123);
442    /// ```
443    pub fn timer(&mut self, metric: &str, value: f64) {
444        let data = format!("{}:{}|ms", metric, value);
445        self.stats.push_back(data);
446    }
447
448    /// Time a block of code.
449    ///
450    /// The passed closure will be timed and executed. The block's
451    /// duration will be sent as a metric.
452    ///
453    /// ```
454    /// use datadog_statsd::client::Pipeline;
455    ///
456    /// let mut pipe = Pipeline::new();
457    /// // pass a duration value
458    /// pipe.time("response.duration", || {
459    ///   // Your code here.
460    /// });
461    /// ```
462    pub fn time<F>(&mut self, metric: &str, callable: F)
463    where
464        F: FnOnce(),
465    {
466        let start = time::Instant::now();
467        callable();
468        let used = start.elapsed();
469        let data = format!("{}:{}|ms", metric, used.as_millis());
470        self.stats.push_back(data);
471    }
472
473    /// Send a histogram value.
474    ///
475    /// ```
476    /// use datadog_statsd::client::Pipeline;
477    ///
478    /// let mut pipe = Pipeline::new();
479    /// // pass response size value
480    /// pipe.histogram("response.size", 128.0);
481    /// ```
482    pub fn histogram(&mut self, metric: &str, value: f64) {
483        let data = format!("{}:{}|h", metric, value);
484        self.stats.push_back(data);
485    }
486
487    /// Send data along the UDP socket.
488    pub fn send(&mut self, client: &Client) {
489        let mut _data = String::new();
490        if let Some(data) = self.stats.pop_front() {
491            _data += client.prepare(&data).as_ref();
492            while !self.stats.is_empty() {
493                let stat = client.prepare(self.stats.pop_front().unwrap());
494                if data.len() + stat.len() + 1 > self.max_udp_size {
495                    client.send(_data.clone());
496                    _data.clear();
497                    _data += &stat;
498                } else {
499                    _data += "\n";
500                    _data += &stat;
501                }
502            }
503        }
504        if !_data.is_empty() {
505            client.send(_data);
506        }
507    }
508}
509
510#[cfg(test)]
511mod test {
512    extern crate rand;
513    use self::rand::distributions::{IndependentSample, Range};
514    use super::*;
515    use std::net::UdpSocket;
516    use std::str;
517    use std::sync::mpsc::sync_channel;
518    use std::thread;
519
520    static PORT: u16 = 8125;
521
522    // Generates random ports.
523    // Having random ports helps tests not collide over
524    // shared ports.
525    fn next_test_ip4() -> String {
526        let range = Range::new(0, 1000);
527        let mut rng = rand::thread_rng();
528        let port = PORT + range.ind_sample(&mut rng);
529        format!("127.0.0.1:{}", port)
530    }
531
532    // Makes a udpsocket that acts as a statsd server.
533    fn make_server(host: &str) -> UdpSocket {
534        UdpSocket::bind(host).ok().unwrap()
535    }
536
537    fn server_recv(server: UdpSocket) -> String {
538        let (serv_tx, serv_rx) = sync_channel(1);
539        let _t = thread::spawn(move || {
540            let mut buf = [0; 128];
541            let (len, _) = match server.recv_from(&mut buf) {
542                Ok(r) => r,
543                Err(_) => panic!("No response from test server."),
544            };
545            drop(server);
546            let bytes = Vec::from(&buf[0..len]);
547            serv_tx.send(bytes).unwrap();
548        });
549
550        let bytes = serv_rx.recv().ok().unwrap();
551        str::from_utf8(&bytes).unwrap().to_string()
552    }
553
554    #[test]
555    fn test_sending_gauge() {
556        let host = next_test_ip4();
557        let server = make_server(&host);
558        let client = Client::new(&host, "myapp", None).unwrap();
559
560        client.gauge("metric", 9.1, &None);
561
562        let response = server_recv(server);
563        assert_eq!("myapp.metric:9.1|g", response);
564    }
565
566    #[test]
567    fn test_sending_gauge_without_prefix() {
568        let host = next_test_ip4();
569        let server = make_server(&host);
570        let client = Client::new(&host, "", None).unwrap();
571
572        client.gauge("metric", 9.1, &None);
573
574        let response = server_recv(server);
575        assert_eq!("metric:9.1|g", response);
576    }
577
578    #[test]
579    fn test_sending_incr() {
580        let host = next_test_ip4();
581        let server = make_server(&host);
582        let client = Client::new(&host, "myapp", None).unwrap();
583
584        client.incr("metric", &None);
585
586        let response = server_recv(server);
587        assert_eq!("myapp.metric:1|c", response);
588    }
589
590    #[test]
591    fn test_sending_decr() {
592        let host = next_test_ip4();
593        let server = make_server(&host);
594        let client = Client::new(&host, "myapp", None).unwrap();
595
596        client.decr("metric", &None);
597
598        let response = server_recv(server);
599        assert_eq!("myapp.metric:-1|c", response);
600    }
601
602    #[test]
603    fn test_sending_count() {
604        let host = next_test_ip4();
605        let server = make_server(&host);
606        let client = Client::new(&host, "myapp", None).unwrap();
607
608        client.count("metric", 12.2, &None);
609
610        let response = server_recv(server);
611        assert_eq!("myapp.metric:12.2|c", response);
612    }
613
614    #[test]
615    fn test_sending_timer() {
616        let host = next_test_ip4();
617        let server = make_server(&host);
618        let client = Client::new(&host, "myapp", None).unwrap();
619
620        client.timer("metric", 21.39, &None);
621
622        let response = server_recv(server);
623        assert_eq!("myapp.metric:21.39|ms", response);
624    }
625
626    #[test]
627    fn test_sending_timed_block() {
628        let host = next_test_ip4();
629        let server = make_server(&host);
630        let client = Client::new(&host, "myapp", None).unwrap();
631        struct TimeTest {
632            num: u8,
633        }
634
635        let mut t = TimeTest { num: 10 };
636        let output = client.time("time_block", &None, || {
637            t.num += 2;
638            "a string"
639        });
640
641        let response = server_recv(server);
642        assert_eq!(output, "a string");
643        assert_eq!(t.num, 12);
644        assert!(response.contains("myapp.time_block"));
645        assert!(response.contains("|ms"));
646    }
647
648    #[test]
649    fn test_sending_histogram() {
650        let host = next_test_ip4();
651        let server = make_server(&host);
652        let client = Client::new(&host, "myapp", None).unwrap();
653
654        // without tags
655        client.histogram("metric", 9.1, &None);
656        let mut response = server_recv(server.try_clone().unwrap());
657        assert_eq!("myapp.metric:9.1|h", response);
658        // with tags
659        client.histogram("metric", 9.1, &Some(vec!["tag1", "tag2:test"]));
660        response = server_recv(server.try_clone().unwrap());
661        assert_eq!("myapp.metric:9.1|h|#tag1,tag2:test", response);
662    }
663
664    #[test]
665    fn test_sending_histogram_with_constant_tags() {
666        let host = next_test_ip4();
667        let server = make_server(&host);
668        let client =
669            Client::new(&host, "myapp", Some(vec!["tag1common", "tag2common:test"])).unwrap();
670
671        // without tags
672        client.histogram("metric", 9.1, &None);
673        let mut response = server_recv(server.try_clone().unwrap());
674        assert_eq!("myapp.metric:9.1|h|#tag1common,tag2common:test", response);
675        // with tags
676        let tags = &Some(vec!["tag1", "tag2:test"]);
677        client.histogram("metric", 9.1, tags);
678        response = server_recv(server.try_clone().unwrap());
679        assert_eq!(
680            "myapp.metric:9.1|h|#tag1common,tag2common:test,tag1,tag2:test",
681            response
682        );
683        // repeat
684        client.histogram("metric", 19.12, tags);
685        response = server_recv(server.try_clone().unwrap());
686        assert_eq!(
687            "myapp.metric:19.12|h|#tag1common,tag2common:test,tag1,tag2:test",
688            response
689        );
690    }
691
692    #[test]
693    fn test_sending_event_with_tags() {
694        let host = next_test_ip4();
695        let server = make_server(&host);
696        let client = Client::new(&host, "myapp", None).unwrap();
697
698        client.event(
699            "Title Test",
700            "Text ABC",
701            AlertType::Error,
702            &Some(vec!["tag1", "tag2:test"]),
703        );
704
705        let response = server_recv(server);
706        assert_eq!(
707            "_e{10,8}:Title Test|Text ABC|t:error|#tag1,tag2:test",
708            response
709        );
710    }
711
712    #[test]
713    fn test_sending_service_check_with_tags() {
714        let host = next_test_ip4();
715        let server = make_server(&host);
716        let client = Client::new(&host, "myapp", None).unwrap();
717
718        client.service_check(
719            "Service.check.name",
720            ServiceCheckStatus::Critical,
721            &Some(vec!["tag1", "tag2:test"]),
722        );
723
724        let response = server_recv(server);
725        assert_eq!("_sc|Service.check.name|2|#tag1,tag2:test", response);
726    }
727
728    #[test]
729    fn test_pipeline_sending_time_block() {
730        let host = next_test_ip4();
731        let server = make_server(&host);
732        let client = Client::new(&host, "myapp", None).unwrap();
733        let mut pipeline = client.pipeline();
734        pipeline.gauge("metric", 9.1);
735        struct TimeTest {
736            num: u8,
737        }
738
739        let mut t = TimeTest { num: 10 };
740        pipeline.time("time_block", || {
741            t.num += 2;
742        });
743        pipeline.send(&client);
744
745        let response = server_recv(server);
746        assert_eq!(t.num, 12);
747        assert_eq!("myapp.metric:9.1|g\nmyapp.time_block:0|ms", response);
748    }
749
750    #[test]
751    fn test_pipeline_sending_gauge() {
752        let host = next_test_ip4();
753        let server = make_server(&host);
754        let client = Client::new(&host, "myapp", None).unwrap();
755        let mut pipeline = client.pipeline();
756        pipeline.gauge("metric", 9.1);
757        pipeline.send(&client);
758
759        let response = server_recv(server);
760        assert_eq!("myapp.metric:9.1|g", response);
761    }
762
763    #[test]
764    fn test_pipeline_sending_histogram() {
765        let host = next_test_ip4();
766        let server = make_server(&host);
767        let client = Client::new(&host, "myapp", None).unwrap();
768        let mut pipeline = client.pipeline();
769        pipeline.histogram("metric", 9.1);
770        pipeline.send(&client);
771
772        let response = server_recv(server);
773        assert_eq!("myapp.metric:9.1|h", response);
774    }
775
776    #[test]
777    fn test_pipeline_sending_multiple_data() {
778        let host = next_test_ip4();
779        let server = make_server(&host);
780        let client = Client::new(&host, "myapp", None).unwrap();
781        let mut pipeline = client.pipeline();
782        pipeline.gauge("metric", 9.1);
783        pipeline.count("metric", 12.2);
784        pipeline.send(&client);
785
786        let response = server_recv(server);
787        assert_eq!("myapp.metric:9.1|g\nmyapp.metric:12.2|c", response);
788    }
789
790    #[test]
791    fn test_pipeline_set_max_udp_size() {
792        let host = next_test_ip4();
793        let server = make_server(&host);
794        let client = Client::new(&host, "myapp", None).unwrap();
795        let mut pipeline = client.pipeline();
796        pipeline.set_max_udp_size(20);
797        pipeline.gauge("metric", 9.1);
798        pipeline.count("metric", 12.2);
799        pipeline.send(&client);
800
801        let response = server_recv(server);
802        assert_eq!("myapp.metric:9.1|g", response);
803    }
804
805    #[test]
806    fn test_pipeline_send_metric_after_pipeline() {
807        let host = next_test_ip4();
808        let server = make_server(&host);
809        let client = Client::new(&host, "myapp", None).unwrap();
810        let mut pipeline = client.pipeline();
811
812        pipeline.gauge("load", 9.0);
813        pipeline.count("customers", 7.0);
814        pipeline.send(&client);
815
816        // Should still be able to send metrics
817        // with the client.
818        client.count("customers", 6.0, &None);
819
820        let response = server_recv(server);
821        assert_eq!("myapp.load:9|g\nmyapp.customers:7|c", response);
822    }
823}