june_analytics/batcher.rs
1use crate::message::{Batch, BatchMessage, Message};
2use crate::{Error, Result};
3use serde_json::{Map, Value};
4use time::OffsetDateTime;
5
6const MAX_MESSAGE_SIZE: usize = 1024 * 32;
7const MAX_BATCH_SIZE: usize = 1024 * 512;
8
9/// A batcher can accept messages into an internal buffer, and report when
10/// messages must be flushed.
11///
12/// The recommended usage pattern looks something like this:
13///
14/// ```
15/// use june_analytics::{Batcher, Client, HttpClient};
16/// use june_analytics::message::{BatchMessage, Track, User};
17/// use serde_json::json;
18///
19/// let mut batcher = Batcher::new(None);
20/// let client = HttpClient::default();
21///
22/// for i in 0..100 {
23/// let msg = Track {
24/// user: User::UserId { user_id: format!("user-{}", i) },
25/// event: "Example".to_owned(),
26/// properties: json!({ "foo": "bar" }),
27/// ..Default::default()
28/// };
29///
30/// // Batcher returns back ownership of a message if the internal buffer
31/// // would overflow.
32/// //
33/// // When this occurs, we flush the batcher, create a new batcher, and add
34/// // the message into the new batcher.
35/// if let Some(msg) = batcher.push(msg).unwrap() {
36/// client.send("your_write_key".to_string(), batcher.into_message());
37/// batcher = Batcher::new(None);
38/// batcher.push(msg).unwrap();
39/// }
40/// }
41/// ```
42///
43/// Batcher will attempt to fit messages into maximally-sized batches, thus
44/// reducing the number of round trips required with June's tracking API.
45/// However, if you produce messages infrequently, this may significantly delay
46/// the sending of messages to June.
47///
48/// If this delay is a concern, it is recommended that you periodically flush
49/// the batcher on your own by calling `into_message`.
50///
51/// By default if the message you push in the batcher does not contains any
52/// timestamp, the timestamp at the time of the push will be automatically
53/// added to your message.
54/// You can disable this behaviour with the [without_auto_timestamp] method
55/// though.
56#[derive(Clone, Debug)]
57pub struct Batcher {
58 pub(crate) buf: Vec<BatchMessage>,
59 pub(crate) byte_count: usize,
60 pub(crate) context: Option<Value>,
61 pub(crate) auto_timestamp: bool,
62}
63
64impl Batcher {
65 /// Construct a new, empty batcher.
66 ///
67 /// Optionally, you may specify a `context` that should be set on every
68 /// batch returned by `into_message`.
69 pub fn new(context: Option<Value>) -> Self {
70 Self {
71 buf: Vec::new(),
72 byte_count: 0,
73 context,
74 auto_timestamp: true,
75 }
76 }
77
78 pub fn without_auto_timestamp(&mut self) {
79 self.auto_timestamp = false;
80 }
81
82 /// Push a message into the batcher.
83 ///
84 /// Returns `Ok(None)` if the message was accepted and is now owned by the
85 /// batcher.
86 ///
87 /// Returns `Ok(Some(msg))` if the message was rejected because the current
88 /// batch would be oversized if this message were accepted. The given
89 /// message is returned back, and it is recommended that you flush the
90 /// current batch before attempting to push `msg` in again.
91 ///
92 /// Returns an error if the message is too large to be sent to June's
93 /// API.
94 pub fn push(&mut self, msg: impl Into<BatchMessage>) -> Result<Option<BatchMessage>> {
95 let mut msg: BatchMessage = msg.into();
96 let timestamp = msg.timestamp_mut();
97 if self.auto_timestamp && timestamp.is_none() {
98 *timestamp = Some(OffsetDateTime::now_utc());
99 }
100 let size = serde_json::to_vec(&msg)?.len();
101 if size > MAX_MESSAGE_SIZE {
102 return Err(Error::MessageTooLarge);
103 }
104
105 self.byte_count += size + 1; // +1 to account for Serialized data's extra commas
106 if self.byte_count > MAX_BATCH_SIZE {
107 return Ok(Some(msg));
108 }
109
110 self.buf.push(msg);
111 Ok(None)
112 }
113
114 pub fn into_message(self) -> Message {
115 Message::Batch(Batch {
116 batch: self.buf,
117 context: self.context,
118 integrations: None,
119 extra: Map::default(),
120 })
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use crate::message::{Track, User};
128 use serde_json::json;
129
130 #[test]
131 fn test_push_and_into() {
132 let batch_msg = BatchMessage::Track(Track {
133 ..Default::default()
134 });
135
136 let context = json!({
137 "foo": "bar",
138 });
139
140 let mut batcher = Batcher::new(Some(context.clone()));
141 batcher.without_auto_timestamp();
142 let result = batcher.push(batch_msg.clone());
143 assert_eq!(None, result.ok().unwrap());
144
145 let batch = batcher.into_message();
146 let inner_batch = match batch {
147 Message::Batch(b) => b,
148 _ => panic!("invalid message type"),
149 };
150 assert_eq!(context, inner_batch.context.unwrap());
151 assert_eq!(1, inner_batch.batch.len());
152
153 assert_eq!(inner_batch.batch, vec![batch_msg]);
154 }
155
156 #[test]
157 fn test_bad_message_size() {
158 let batch_msg = Track {
159 user: User::UserId {
160 user_id: String::from_utf8(vec![b'a'; 1024 * 33]).unwrap(),
161 },
162 ..Default::default()
163 };
164
165 let mut batcher = Batcher::new(None);
166 let result = batcher.push(batch_msg);
167
168 let err = result.err().unwrap();
169 assert!(err.to_string().contains("message too large"));
170 }
171
172 #[test]
173 fn test_max_buffer() {
174 let batch_msg = Track {
175 user: User::UserId {
176 user_id: String::from_utf8(vec![b'a'; 1024 * 30]).unwrap(),
177 },
178 ..Default::default()
179 };
180
181 let mut batcher = Batcher::new(None);
182 batcher.without_auto_timestamp();
183 let mut result = Ok(None);
184 for _i in 0..20 {
185 result = batcher.push(batch_msg.clone());
186 dbg!(&result);
187 if result.is_ok() && result.as_ref().ok().unwrap().is_some() {
188 break;
189 }
190 }
191
192 let msg = result.ok().unwrap();
193 assert_eq!(BatchMessage::from(batch_msg), msg.unwrap());
194 }
195}