libhoney/
transmission.rs

1/*! Transmission handles the transmission of events to Honeycomb
2
3*/
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{
9    bounded, Receiver as ChannelReceiver, RecvTimeoutError, Sender as ChannelSender,
10};
11
12use log::{error, info, trace};
13use parking_lot::Mutex;
14use reqwest::{header, StatusCode};
15use tokio::runtime::{Builder, Runtime};
16
17use crate::errors::{Error, Result};
18use crate::event::Event;
19use crate::eventdata::EventData;
20use crate::events::{Events, EventsResponse};
21use crate::response::{HoneyResponse, Response};
22use crate::sender::Sender;
23
24const BATCH_ENDPOINT: &str = "/1/batch/";
25
26const DEFAULT_NAME_PREFIX: &str = "libhoney-rust";
27// DEFAULT_MAX_BATCH_SIZE how many events to collect in a batch
28const DEFAULT_MAX_BATCH_SIZE: usize = 50;
29// DEFAULT_MAX_CONCURRENT_BATCHES how many batches to maintain in parallel
30const DEFAULT_MAX_CONCURRENT_BATCHES: usize = 10;
31// DEFAULT_BATCH_TIMEOUT how frequently to send unfilled batches
32const DEFAULT_BATCH_TIMEOUT: Duration = Duration::from_millis(100);
33// DEFAULT_PENDING_WORK_CAPACITY how many events to queue up for busy batches
34const DEFAULT_PENDING_WORK_CAPACITY: usize = 10_000;
35// DEFAULT_SEND_TIMEOUT how much to wait to send an event
36const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_millis(1_000);
37
38/// Options includes various options to tweak the behavious of the sender.
39#[derive(Debug, Clone)]
40pub struct Options {
41    /// how many events to collect into a batch before sending. Overrides
42    /// DEFAULT_MAX_BATCH_SIZE.
43    pub max_batch_size: usize,
44
45    /// how many batches can be inflight simultaneously. Overrides
46    /// DEFAULT_MAX_CONCURRENT_BATCHES.
47    pub max_concurrent_batches: usize,
48
49    /// how often to send off batches. Overrides DEFAULT_BATCH_TIMEOUT.
50    pub batch_timeout: Duration,
51
52    /// how many events to allow to pile up. Overrides DEFAULT_PENDING_WORK_CAPACITY
53    pub pending_work_capacity: usize,
54
55    /// user_agent_addition is an option that allows you to augment the "User-Agent"
56    /// header that libhoney sends along with each event.  The default User-Agent is
57    /// "libhoney-go/<version>". If you set this variable, its contents will be appended
58    /// to the User-Agent string, separated by a space. The expected format is
59    /// product-name/version, eg "myapp/1.0"
60    pub user_agent_addition: Option<String>,
61}
62
63impl Default for Options {
64    fn default() -> Self {
65        Self {
66            max_batch_size: DEFAULT_MAX_BATCH_SIZE,
67            max_concurrent_batches: DEFAULT_MAX_CONCURRENT_BATCHES,
68            batch_timeout: DEFAULT_BATCH_TIMEOUT,
69            pending_work_capacity: DEFAULT_PENDING_WORK_CAPACITY,
70            user_agent_addition: None,
71        }
72    }
73}
74
75/// `Transmission` handles collecting and sending individual events to Honeycomb
76#[derive(Debug, Clone)]
77pub struct Transmission {
78    pub(crate) options: Options,
79    user_agent: String,
80
81    runtime: Arc<Mutex<Runtime>>,
82    http_client: reqwest::Client,
83
84    work_sender: ChannelSender<Event>,
85    work_receiver: ChannelReceiver<Event>,
86    response_sender: ChannelSender<Response>,
87    response_receiver: ChannelReceiver<Response>,
88}
89
90impl Drop for Transmission {
91    fn drop(&mut self) {
92        self.stop().unwrap();
93    }
94}
95
96impl Sender for Transmission {
97    fn start(&mut self) {
98        let work_receiver = self.work_receiver.clone();
99        let response_sender = self.response_sender.clone();
100        let options = self.options.clone();
101        let user_agent = self.user_agent.clone();
102        let http_client = self.http_client.clone();
103
104        info!("transmission starting");
105        // thread that processes all the work received
106        let runtime = self.runtime.clone();
107        runtime.lock().spawn(async {
108            Self::process_work(
109                work_receiver,
110                response_sender,
111                options,
112                user_agent,
113                http_client,
114            )
115            .await
116        });
117    }
118
119    fn stop(&mut self) -> Result<()> {
120        info!("transmission stopping");
121        if self.work_sender.is_full() {
122            error!("work sender is full");
123            return Err(Error::sender_full("work"));
124        }
125        Ok(self.work_sender.send(Event::stop_event())?)
126    }
127
128    fn send(&mut self, event: Event) {
129        let clock = Instant::now();
130        if self.work_sender.is_full() {
131            error!("work sender is full");
132            self.response_sender
133                .send(Response {
134                    status_code: None,
135                    body: None,
136                    duration: clock.elapsed(),
137                    metadata: event.metadata,
138                    error: Some("queue overflow".to_string()),
139                })
140                .unwrap_or_else(|e| {
141                    error!("response dropped, error: {}", e);
142                });
143        } else {
144            let runtime = self.runtime.clone();
145            let work_sender = self.work_sender.clone();
146            let response_sender = self.response_sender.clone();
147            runtime.lock().spawn(async move {
148                work_sender
149                    .clone()
150                    .send_timeout(event.clone(), DEFAULT_SEND_TIMEOUT)
151                    .map_err(|e| {
152                        response_sender
153                            .send(Response {
154                                status_code: None,
155                                body: None,
156                                duration: clock.elapsed(),
157                                metadata: event.metadata,
158                                error: Some(e.to_string()),
159                            })
160                            .unwrap_or_else(|e| {
161                                error!("response dropped, error: {}", e);
162                            });
163                    })
164            });
165        }
166    }
167
168    /// responses provides access to the receiver
169    fn responses(&self) -> ChannelReceiver<Response> {
170        self.response_receiver.clone()
171    }
172}
173
174impl Transmission {
175    fn new_runtime(options: Option<&Options>) -> Result<Runtime> {
176        let mut builder = Builder::new();
177        if let Some(opts) = options {
178            builder.core_threads(opts.max_concurrent_batches);
179        };
180        Ok(builder
181            .thread_name("libhoney-rust")
182            .thread_stack_size(3 * 1024 * 1024)
183            .threaded_scheduler()
184            .enable_io()
185            .enable_time()
186            .build()?)
187    }
188
189    pub(crate) fn new(options: Options) -> Result<Self> {
190        let runtime = Self::new_runtime(None)?;
191
192        let (work_sender, work_receiver) = bounded(options.pending_work_capacity * 4);
193        let (response_sender, response_receiver) = bounded(options.pending_work_capacity * 4);
194
195        Ok(Self {
196            runtime: Arc::new(Mutex::new(runtime)),
197            options,
198            work_sender,
199            work_receiver,
200            response_sender,
201            response_receiver,
202            user_agent: format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION")),
203            http_client: reqwest::Client::new(),
204        })
205    }
206
207    async fn process_work(
208        work_receiver: ChannelReceiver<Event>,
209        response_sender: ChannelSender<Response>,
210        options: Options,
211        user_agent: String,
212        http_client: reqwest::Client,
213    ) {
214        let runtime = Self::new_runtime(Some(&options)).expect("Could not start new runtime");
215        let mut batches: HashMap<String, Events> = HashMap::new();
216        let mut expired = false;
217
218        loop {
219            let options = options.clone();
220
221            match work_receiver.recv_timeout(options.batch_timeout) {
222                Ok(event) => {
223                    if event.fields.contains_key("internal_stop_event") {
224                        info!("got 'internal_stop_event' event");
225                        break;
226                    }
227                    let key = format!(
228                        "{}_{}_{}",
229                        event.options.api_host, event.options.api_key, event.options.dataset
230                    );
231                    batches
232                        .entry(key)
233                        .and_modify(|v| v.push(event.clone()))
234                        .or_insert({
235                            let mut v = Vec::with_capacity(options.max_batch_size);
236                            v.push(event);
237                            v
238                        });
239                }
240                Err(RecvTimeoutError::Timeout) => {
241                    expired = true;
242                }
243                Err(RecvTimeoutError::Disconnected) => {
244                    // TODO(nlopes): is this the right behaviour?
245                    break;
246                }
247            };
248
249            let mut batches_sent = Vec::new();
250            for (batch_name, batch) in batches.iter_mut() {
251                if batch.is_empty() {
252                    break;
253                }
254                let options = options.clone();
255
256                if batch.len() >= options.max_batch_size || expired {
257                    trace!(
258                        "Timer expired or batch size exceeded with {} event(s)",
259                        batch.len()
260                    );
261                    let batch_copy = batch.clone();
262                    let batch_response_sender = response_sender.clone();
263                    let batch_user_agent = user_agent.to_string();
264                    // This is a shallow clone that allows reusing HTTPS connections across batches.
265                    // From the reqwest docs:
266                    //   "You do not have to wrap the Client it in an Rc or Arc to reuse it, because
267                    //    it already uses an Arc internally."
268                    let client_copy = http_client.clone();
269
270                    runtime.spawn(async move {
271                        for response in Self::send_batch(
272                            batch_copy,
273                            options,
274                            batch_user_agent,
275                            Instant::now(),
276                            client_copy,
277                        )
278                        .await
279                        {
280                            batch_response_sender
281                                .send(response)
282                                .expect("unable to enqueue batch response");
283                        }
284                    });
285                    batches_sent.push(batch_name.to_string())
286                }
287            }
288            // clear all sent batches
289            batches_sent.iter_mut().for_each(|name| {
290                batches.remove(name);
291            });
292
293            // If we get here and we were expired, then we've already triggered a send, so
294            // we reset this to ensure it kicks off again
295            if expired {
296                expired = false;
297            }
298        }
299        info!("Shutting down batch processing runtime");
300        runtime.shutdown_background();
301        info!("Batch processing runtime shut down");
302    }
303
304    async fn send_batch(
305        events: Events,
306        options: Options,
307        user_agent: String,
308        clock: Instant,
309        client: reqwest::Client,
310    ) -> Vec<Response> {
311        let mut opts: crate::client::Options = crate::client::Options::default();
312        let mut payload: Vec<EventData> = Vec::new();
313
314        for event in &events {
315            opts = event.options.clone();
316            payload.push(EventData {
317                data: event.fields.clone(),
318                time: event.timestamp,
319                samplerate: event.options.sample_rate,
320            })
321        }
322
323        let endpoint = format!("{}{}{}", opts.api_host, BATCH_ENDPOINT, &opts.dataset);
324
325        let user_agent = if let Some(ua_addition) = options.user_agent_addition {
326            format!("{}{}", user_agent, ua_addition)
327        } else {
328            user_agent
329        };
330
331        trace!("Sending payload: {:#?}", payload);
332        let response = client
333            .post(&endpoint)
334            .header(header::USER_AGENT, user_agent)
335            .header(header::CONTENT_TYPE, "application/json")
336            .header("X-Honeycomb-Team", opts.api_key)
337            .json(&payload)
338            .send()
339            .await;
340
341        trace!("Received response: {:#?}", response);
342        match response {
343            Ok(res) => match res.status() {
344                StatusCode::OK => {
345                    let responses: Vec<HoneyResponse>;
346                    match res.json().await {
347                        Ok(r) => responses = r,
348                        Err(e) => {
349                            return events.to_response(None, None, clock, Some(e.to_string()));
350                        }
351                    }
352                    let total_responses = if responses.is_empty() {
353                        1
354                    } else {
355                        responses.len() as u64
356                    };
357
358                    let spent = Duration::from_secs(clock.elapsed().as_secs() / total_responses);
359
360                    responses
361                        .iter()
362                        .zip(events.iter())
363                        .map(|(hr, e)| Response {
364                            status_code: StatusCode::from_u16(hr.status).ok(),
365                            body: None,
366                            duration: spent,
367                            metadata: e.metadata.clone(),
368                            error: hr.error.clone(),
369                        })
370                        .collect()
371                }
372                status => {
373                    let body = match res.text().await {
374                        Ok(t) => t,
375                        Err(e) => format!("HTTP Error but could not read response body: {}", e),
376                    };
377                    events.to_response(Some(status), Some(body), clock, None)
378                }
379            },
380            Err(err) => events.to_response(None, None, clock, Some(err.to_string())),
381        }
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use reqwest::StatusCode;
388
389    use super::*;
390    use crate::client;
391
392    #[test]
393    fn test_defaults() {
394        let transmission = Transmission::new(Options::default()).unwrap();
395        assert_eq!(
396            transmission.user_agent,
397            format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION"))
398        );
399
400        assert_eq!(transmission.options.max_batch_size, DEFAULT_MAX_BATCH_SIZE);
401        assert_eq!(transmission.options.batch_timeout, DEFAULT_BATCH_TIMEOUT);
402        assert_eq!(
403            transmission.options.max_concurrent_batches,
404            DEFAULT_MAX_CONCURRENT_BATCHES
405        );
406        assert_eq!(
407            transmission.options.pending_work_capacity,
408            DEFAULT_PENDING_WORK_CAPACITY
409        );
410    }
411
412    #[test]
413    fn test_modifiable_defaults() {
414        let transmission = Transmission::new(Options {
415            user_agent_addition: Some(" something/0.3".to_string()),
416            ..Options::default()
417        })
418        .unwrap();
419        assert_eq!(
420            transmission.options.user_agent_addition,
421            Some(" something/0.3".to_string())
422        );
423    }
424
425    #[test]
426    fn test_responses() {
427        use crate::fields::FieldHolder;
428
429        let mut transmission = Transmission::new(Options {
430            max_batch_size: 5,
431            ..Options::default()
432        })
433        .unwrap();
434        transmission.start();
435
436        let api_host = &mockito::server_url();
437        let _m = mockito::mock(
438            "POST",
439            mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()),
440        )
441        .with_status(200)
442        .with_header("content-type", "application/json")
443        .with_body(
444            r#"
445[
446  { "status":202 },
447  { "status":202 },
448  { "status":202 },
449  { "status":202 },
450  { "status":202 }
451]
452"#,
453        )
454        .create();
455
456        for i in 0..5 {
457            let mut event = Event::new(&client::Options {
458                api_key: "some_api_key".to_string(),
459                api_host: api_host.to_string(),
460                ..client::Options::default()
461            });
462            event.add_field("id", serde_json::from_str(&i.to_string()).unwrap());
463            transmission.send(event);
464        }
465        for (i, response) in transmission.responses().iter().enumerate() {
466            if i == 4 {
467                break;
468            }
469            assert_eq!(response.status_code, Some(StatusCode::ACCEPTED));
470            assert_eq!(response.body, None);
471        }
472        transmission.stop().unwrap();
473    }
474
475    #[test]
476    fn test_metadata() {
477        use serde_json::json;
478
479        let mut transmission = Transmission::new(Options {
480            max_batch_size: 1,
481            ..Options::default()
482        })
483        .unwrap();
484        transmission.start();
485
486        let api_host = &mockito::server_url();
487        let _m = mockito::mock(
488            "POST",
489            mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()),
490        )
491        .with_status(200)
492        .with_header("content-type", "application/json")
493        .with_body(
494            r#"
495[
496  { "status":202 }
497]
498"#,
499        )
500        .create();
501
502        let metadata = Some(json!("some metadata in a string"));
503        let mut event = Event::new(&client::Options {
504            api_key: "some_api_key".to_string(),
505            api_host: api_host.to_string(),
506            ..client::Options::default()
507        });
508        event.metadata = metadata.clone();
509        transmission.send(event);
510
511        if let Some(response) = transmission.responses().iter().next() {
512            assert_eq!(response.status_code, Some(StatusCode::ACCEPTED));
513            assert_eq!(response.metadata, metadata);
514        } else {
515            panic!("did not receive an expected response");
516        }
517        transmission.stop().unwrap();
518    }
519
520    #[test]
521    fn test_multiple_batches() {
522        // What we try to test here is if events are sent in separate batches, depending
523        // on their combination of api_host, api_key, dataset.
524        //
525        // For that, we set max_batch_size to 2, then we send 3 events, 2 with one
526        // combination and 1 with another.  Only the two should be sent, and we should get
527        // back two responses.
528        use serde_json::json;
529        let mut transmission = Transmission::new(Options {
530            max_batch_size: 2,
531            batch_timeout: Duration::from_secs(5),
532            ..Options::default()
533        })
534        .unwrap();
535        transmission.start();
536
537        let api_host = &mockito::server_url();
538        let _m = mockito::mock(
539            "POST",
540            mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()),
541        )
542        .with_status(200)
543        .with_header("content-type", "application/json")
544        .with_body(
545            r#"
546[
547  { "status":202 },
548  { "status":202 }
549]"#,
550        )
551        .create();
552
553        let mut event1 = Event::new(&client::Options {
554            api_key: "some_api_key".to_string(),
555            api_host: api_host.to_string(),
556            dataset: "same".to_string(),
557            ..client::Options::default()
558        });
559        event1.metadata = Some(json!("event1"));
560        let mut event2 = event1.clone();
561        event2.metadata = Some(json!("event2"));
562        let mut event3 = event1.clone();
563        event3.options.dataset = "other".to_string();
564        event3.metadata = Some(json!("event3"));
565
566        transmission.send(event3);
567        transmission.send(event2);
568        transmission.send(event1);
569
570        let response1 = transmission.responses().iter().next().unwrap();
571        let response2 = transmission.responses().iter().next().unwrap();
572        let _ = transmission
573            .responses()
574            .recv_timeout(Duration::from_millis(250))
575            .err();
576
577        assert_eq!(response1.status_code, Some(StatusCode::ACCEPTED));
578        assert_eq!(response2.status_code, Some(StatusCode::ACCEPTED));
579
580        // Responses can come out of order so we check against any of the metadata
581        assert!(
582            response1.metadata == Some(json!("event1"))
583                || response1.metadata == Some(json!("event2"))
584        );
585        assert!(
586            response2.metadata == Some(json!("event1"))
587                || response2.metadata == Some(json!("event2"))
588        );
589        transmission.stop().unwrap();
590    }
591
592    #[test]
593    fn test_bad_response() {
594        use serde_json::json;
595
596        let mut transmission = Transmission::new(Options::default()).unwrap();
597        transmission.start();
598
599        let api_host = &mockito::server_url();
600        let _m = mockito::mock(
601            "POST",
602            mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()),
603        )
604        .with_status(400)
605        .with_header("content-type", "application/json")
606        .with_body("request body is malformed and cannot be read as JSON")
607        .create();
608
609        let mut event = Event::new(&client::Options {
610            api_key: "some_api_key".to_string(),
611            api_host: api_host.to_string(),
612            ..client::Options::default()
613        });
614
615        event.metadata = Some(json!("some metadata in a string"));
616        transmission.send(event);
617
618        if let Some(response) = transmission.responses().iter().next() {
619            assert_eq!(response.status_code, Some(StatusCode::BAD_REQUEST));
620            assert_eq!(
621                response.body,
622                Some("request body is malformed and cannot be read as JSON".to_string())
623            );
624        } else {
625            panic!("did not receive an expected response");
626        }
627        transmission.stop().unwrap();
628    }
629}