1use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{
9 bounded, Receiver as ChannelReceiver, RecvTimeoutError, Sender as ChannelSender,
10};
11
12use log::{error, info, trace};
13use parking_lot::Mutex;
14use reqwest::{header, StatusCode};
15use tokio::runtime::{Builder, Runtime};
16
17use crate::errors::{Error, Result};
18use crate::event::Event;
19use crate::eventdata::EventData;
20use crate::events::{Events, EventsResponse};
21use crate::response::{HoneyResponse, Response};
22use crate::sender::Sender;
23
24const BATCH_ENDPOINT: &str = "/1/batch/";
25
26const DEFAULT_NAME_PREFIX: &str = "libhoney-rust";
27const DEFAULT_MAX_BATCH_SIZE: usize = 50;
29const DEFAULT_MAX_CONCURRENT_BATCHES: usize = 10;
31const DEFAULT_BATCH_TIMEOUT: Duration = Duration::from_millis(100);
33const DEFAULT_PENDING_WORK_CAPACITY: usize = 10_000;
35const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_millis(1_000);
37
38#[derive(Debug, Clone)]
40pub struct Options {
41 pub max_batch_size: usize,
44
45 pub max_concurrent_batches: usize,
48
49 pub batch_timeout: Duration,
51
52 pub pending_work_capacity: usize,
54
55 pub user_agent_addition: Option<String>,
61}
62
63impl Default for Options {
64 fn default() -> Self {
65 Self {
66 max_batch_size: DEFAULT_MAX_BATCH_SIZE,
67 max_concurrent_batches: DEFAULT_MAX_CONCURRENT_BATCHES,
68 batch_timeout: DEFAULT_BATCH_TIMEOUT,
69 pending_work_capacity: DEFAULT_PENDING_WORK_CAPACITY,
70 user_agent_addition: None,
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct Transmission {
78 pub(crate) options: Options,
79 user_agent: String,
80
81 runtime: Arc<Mutex<Runtime>>,
82 http_client: reqwest::Client,
83
84 work_sender: ChannelSender<Event>,
85 work_receiver: ChannelReceiver<Event>,
86 response_sender: ChannelSender<Response>,
87 response_receiver: ChannelReceiver<Response>,
88}
89
90impl Drop for Transmission {
91 fn drop(&mut self) {
92 self.stop().unwrap();
93 }
94}
95
96impl Sender for Transmission {
97 fn start(&mut self) {
98 let work_receiver = self.work_receiver.clone();
99 let response_sender = self.response_sender.clone();
100 let options = self.options.clone();
101 let user_agent = self.user_agent.clone();
102 let http_client = self.http_client.clone();
103
104 info!("transmission starting");
105 let runtime = self.runtime.clone();
107 runtime.lock().spawn(async {
108 Self::process_work(
109 work_receiver,
110 response_sender,
111 options,
112 user_agent,
113 http_client,
114 )
115 .await
116 });
117 }
118
119 fn stop(&mut self) -> Result<()> {
120 info!("transmission stopping");
121 if self.work_sender.is_full() {
122 error!("work sender is full");
123 return Err(Error::sender_full("work"));
124 }
125 Ok(self.work_sender.send(Event::stop_event())?)
126 }
127
128 fn send(&mut self, event: Event) {
129 let clock = Instant::now();
130 if self.work_sender.is_full() {
131 error!("work sender is full");
132 self.response_sender
133 .send(Response {
134 status_code: None,
135 body: None,
136 duration: clock.elapsed(),
137 metadata: event.metadata,
138 error: Some("queue overflow".to_string()),
139 })
140 .unwrap_or_else(|e| {
141 error!("response dropped, error: {}", e);
142 });
143 } else {
144 let runtime = self.runtime.clone();
145 let work_sender = self.work_sender.clone();
146 let response_sender = self.response_sender.clone();
147 runtime.lock().spawn(async move {
148 work_sender
149 .clone()
150 .send_timeout(event.clone(), DEFAULT_SEND_TIMEOUT)
151 .map_err(|e| {
152 response_sender
153 .send(Response {
154 status_code: None,
155 body: None,
156 duration: clock.elapsed(),
157 metadata: event.metadata,
158 error: Some(e.to_string()),
159 })
160 .unwrap_or_else(|e| {
161 error!("response dropped, error: {}", e);
162 });
163 })
164 });
165 }
166 }
167
168 fn responses(&self) -> ChannelReceiver<Response> {
170 self.response_receiver.clone()
171 }
172}
173
174impl Transmission {
175 fn new_runtime(options: Option<&Options>) -> Result<Runtime> {
176 let mut builder = Builder::new();
177 if let Some(opts) = options {
178 builder.core_threads(opts.max_concurrent_batches);
179 };
180 Ok(builder
181 .thread_name("libhoney-rust")
182 .thread_stack_size(3 * 1024 * 1024)
183 .threaded_scheduler()
184 .enable_io()
185 .enable_time()
186 .build()?)
187 }
188
189 pub(crate) fn new(options: Options) -> Result<Self> {
190 let runtime = Self::new_runtime(None)?;
191
192 let (work_sender, work_receiver) = bounded(options.pending_work_capacity * 4);
193 let (response_sender, response_receiver) = bounded(options.pending_work_capacity * 4);
194
195 Ok(Self {
196 runtime: Arc::new(Mutex::new(runtime)),
197 options,
198 work_sender,
199 work_receiver,
200 response_sender,
201 response_receiver,
202 user_agent: format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION")),
203 http_client: reqwest::Client::new(),
204 })
205 }
206
207 async fn process_work(
208 work_receiver: ChannelReceiver<Event>,
209 response_sender: ChannelSender<Response>,
210 options: Options,
211 user_agent: String,
212 http_client: reqwest::Client,
213 ) {
214 let runtime = Self::new_runtime(Some(&options)).expect("Could not start new runtime");
215 let mut batches: HashMap<String, Events> = HashMap::new();
216 let mut expired = false;
217
218 loop {
219 let options = options.clone();
220
221 match work_receiver.recv_timeout(options.batch_timeout) {
222 Ok(event) => {
223 if event.fields.contains_key("internal_stop_event") {
224 info!("got 'internal_stop_event' event");
225 break;
226 }
227 let key = format!(
228 "{}_{}_{}",
229 event.options.api_host, event.options.api_key, event.options.dataset
230 );
231 batches
232 .entry(key)
233 .and_modify(|v| v.push(event.clone()))
234 .or_insert({
235 let mut v = Vec::with_capacity(options.max_batch_size);
236 v.push(event);
237 v
238 });
239 }
240 Err(RecvTimeoutError::Timeout) => {
241 expired = true;
242 }
243 Err(RecvTimeoutError::Disconnected) => {
244 break;
246 }
247 };
248
249 let mut batches_sent = Vec::new();
250 for (batch_name, batch) in batches.iter_mut() {
251 if batch.is_empty() {
252 break;
253 }
254 let options = options.clone();
255
256 if batch.len() >= options.max_batch_size || expired {
257 trace!(
258 "Timer expired or batch size exceeded with {} event(s)",
259 batch.len()
260 );
261 let batch_copy = batch.clone();
262 let batch_response_sender = response_sender.clone();
263 let batch_user_agent = user_agent.to_string();
264 let client_copy = http_client.clone();
269
270 runtime.spawn(async move {
271 for response in Self::send_batch(
272 batch_copy,
273 options,
274 batch_user_agent,
275 Instant::now(),
276 client_copy,
277 )
278 .await
279 {
280 batch_response_sender
281 .send(response)
282 .expect("unable to enqueue batch response");
283 }
284 });
285 batches_sent.push(batch_name.to_string())
286 }
287 }
288 batches_sent.iter_mut().for_each(|name| {
290 batches.remove(name);
291 });
292
293 if expired {
296 expired = false;
297 }
298 }
299 info!("Shutting down batch processing runtime");
300 runtime.shutdown_background();
301 info!("Batch processing runtime shut down");
302 }
303
304 async fn send_batch(
305 events: Events,
306 options: Options,
307 user_agent: String,
308 clock: Instant,
309 client: reqwest::Client,
310 ) -> Vec<Response> {
311 let mut opts: crate::client::Options = crate::client::Options::default();
312 let mut payload: Vec<EventData> = Vec::new();
313
314 for event in &events {
315 opts = event.options.clone();
316 payload.push(EventData {
317 data: event.fields.clone(),
318 time: event.timestamp,
319 samplerate: event.options.sample_rate,
320 })
321 }
322
323 let endpoint = format!("{}{}{}", opts.api_host, BATCH_ENDPOINT, &opts.dataset);
324
325 let user_agent = if let Some(ua_addition) = options.user_agent_addition {
326 format!("{}{}", user_agent, ua_addition)
327 } else {
328 user_agent
329 };
330
331 trace!("Sending payload: {:#?}", payload);
332 let response = client
333 .post(&endpoint)
334 .header(header::USER_AGENT, user_agent)
335 .header(header::CONTENT_TYPE, "application/json")
336 .header("X-Honeycomb-Team", opts.api_key)
337 .json(&payload)
338 .send()
339 .await;
340
341 trace!("Received response: {:#?}", response);
342 match response {
343 Ok(res) => match res.status() {
344 StatusCode::OK => {
345 let responses: Vec<HoneyResponse>;
346 match res.json().await {
347 Ok(r) => responses = r,
348 Err(e) => {
349 return events.to_response(None, None, clock, Some(e.to_string()));
350 }
351 }
352 let total_responses = if responses.is_empty() {
353 1
354 } else {
355 responses.len() as u64
356 };
357
358 let spent = Duration::from_secs(clock.elapsed().as_secs() / total_responses);
359
360 responses
361 .iter()
362 .zip(events.iter())
363 .map(|(hr, e)| Response {
364 status_code: StatusCode::from_u16(hr.status).ok(),
365 body: None,
366 duration: spent,
367 metadata: e.metadata.clone(),
368 error: hr.error.clone(),
369 })
370 .collect()
371 }
372 status => {
373 let body = match res.text().await {
374 Ok(t) => t,
375 Err(e) => format!("HTTP Error but could not read response body: {}", e),
376 };
377 events.to_response(Some(status), Some(body), clock, None)
378 }
379 },
380 Err(err) => events.to_response(None, None, clock, Some(err.to_string())),
381 }
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use reqwest::StatusCode;
388
389 use super::*;
390 use crate::client;
391
392 #[test]
393 fn test_defaults() {
394 let transmission = Transmission::new(Options::default()).unwrap();
395 assert_eq!(
396 transmission.user_agent,
397 format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION"))
398 );
399
400 assert_eq!(transmission.options.max_batch_size, DEFAULT_MAX_BATCH_SIZE);
401 assert_eq!(transmission.options.batch_timeout, DEFAULT_BATCH_TIMEOUT);
402 assert_eq!(
403 transmission.options.max_concurrent_batches,
404 DEFAULT_MAX_CONCURRENT_BATCHES
405 );
406 assert_eq!(
407 transmission.options.pending_work_capacity,
408 DEFAULT_PENDING_WORK_CAPACITY
409 );
410 }
411
412 #[test]
413 fn test_modifiable_defaults() {
414 let transmission = Transmission::new(Options {
415 user_agent_addition: Some(" something/0.3".to_string()),
416 ..Options::default()
417 })
418 .unwrap();
419 assert_eq!(
420 transmission.options.user_agent_addition,
421 Some(" something/0.3".to_string())
422 );
423 }
424
425 #[test]
426 fn test_responses() {
427 use crate::fields::FieldHolder;
428
429 let mut transmission = Transmission::new(Options {
430 max_batch_size: 5,
431 ..Options::default()
432 })
433 .unwrap();
434 transmission.start();
435
436 let api_host = &mockito::server_url();
437 let _m = mockito::mock(
438 "POST",
439 mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()),
440 )
441 .with_status(200)
442 .with_header("content-type", "application/json")
443 .with_body(
444 r#"
445[
446 { "status":202 },
447 { "status":202 },
448 { "status":202 },
449 { "status":202 },
450 { "status":202 }
451]
452"#,
453 )
454 .create();
455
456 for i in 0..5 {
457 let mut event = Event::new(&client::Options {
458 api_key: "some_api_key".to_string(),
459 api_host: api_host.to_string(),
460 ..client::Options::default()
461 });
462 event.add_field("id", serde_json::from_str(&i.to_string()).unwrap());
463 transmission.send(event);
464 }
465 for (i, response) in transmission.responses().iter().enumerate() {
466 if i == 4 {
467 break;
468 }
469 assert_eq!(response.status_code, Some(StatusCode::ACCEPTED));
470 assert_eq!(response.body, None);
471 }
472 transmission.stop().unwrap();
473 }
474
475 #[test]
476 fn test_metadata() {
477 use serde_json::json;
478
479 let mut transmission = Transmission::new(Options {
480 max_batch_size: 1,
481 ..Options::default()
482 })
483 .unwrap();
484 transmission.start();
485
486 let api_host = &mockito::server_url();
487 let _m = mockito::mock(
488 "POST",
489 mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()),
490 )
491 .with_status(200)
492 .with_header("content-type", "application/json")
493 .with_body(
494 r#"
495[
496 { "status":202 }
497]
498"#,
499 )
500 .create();
501
502 let metadata = Some(json!("some metadata in a string"));
503 let mut event = Event::new(&client::Options {
504 api_key: "some_api_key".to_string(),
505 api_host: api_host.to_string(),
506 ..client::Options::default()
507 });
508 event.metadata = metadata.clone();
509 transmission.send(event);
510
511 if let Some(response) = transmission.responses().iter().next() {
512 assert_eq!(response.status_code, Some(StatusCode::ACCEPTED));
513 assert_eq!(response.metadata, metadata);
514 } else {
515 panic!("did not receive an expected response");
516 }
517 transmission.stop().unwrap();
518 }
519
520 #[test]
521 fn test_multiple_batches() {
522 use serde_json::json;
529 let mut transmission = Transmission::new(Options {
530 max_batch_size: 2,
531 batch_timeout: Duration::from_secs(5),
532 ..Options::default()
533 })
534 .unwrap();
535 transmission.start();
536
537 let api_host = &mockito::server_url();
538 let _m = mockito::mock(
539 "POST",
540 mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()),
541 )
542 .with_status(200)
543 .with_header("content-type", "application/json")
544 .with_body(
545 r#"
546[
547 { "status":202 },
548 { "status":202 }
549]"#,
550 )
551 .create();
552
553 let mut event1 = Event::new(&client::Options {
554 api_key: "some_api_key".to_string(),
555 api_host: api_host.to_string(),
556 dataset: "same".to_string(),
557 ..client::Options::default()
558 });
559 event1.metadata = Some(json!("event1"));
560 let mut event2 = event1.clone();
561 event2.metadata = Some(json!("event2"));
562 let mut event3 = event1.clone();
563 event3.options.dataset = "other".to_string();
564 event3.metadata = Some(json!("event3"));
565
566 transmission.send(event3);
567 transmission.send(event2);
568 transmission.send(event1);
569
570 let response1 = transmission.responses().iter().next().unwrap();
571 let response2 = transmission.responses().iter().next().unwrap();
572 let _ = transmission
573 .responses()
574 .recv_timeout(Duration::from_millis(250))
575 .err();
576
577 assert_eq!(response1.status_code, Some(StatusCode::ACCEPTED));
578 assert_eq!(response2.status_code, Some(StatusCode::ACCEPTED));
579
580 assert!(
582 response1.metadata == Some(json!("event1"))
583 || response1.metadata == Some(json!("event2"))
584 );
585 assert!(
586 response2.metadata == Some(json!("event1"))
587 || response2.metadata == Some(json!("event2"))
588 );
589 transmission.stop().unwrap();
590 }
591
592 #[test]
593 fn test_bad_response() {
594 use serde_json::json;
595
596 let mut transmission = Transmission::new(Options::default()).unwrap();
597 transmission.start();
598
599 let api_host = &mockito::server_url();
600 let _m = mockito::mock(
601 "POST",
602 mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()),
603 )
604 .with_status(400)
605 .with_header("content-type", "application/json")
606 .with_body("request body is malformed and cannot be read as JSON")
607 .create();
608
609 let mut event = Event::new(&client::Options {
610 api_key: "some_api_key".to_string(),
611 api_host: api_host.to_string(),
612 ..client::Options::default()
613 });
614
615 event.metadata = Some(json!("some metadata in a string"));
616 transmission.send(event);
617
618 if let Some(response) = transmission.responses().iter().next() {
619 assert_eq!(response.status_code, Some(StatusCode::BAD_REQUEST));
620 assert_eq!(
621 response.body,
622 Some("request body is malformed and cannot be read as JSON".to_string())
623 );
624 } else {
625 panic!("did not receive an expected response");
626 }
627 transmission.stop().unwrap();
628 }
629}