june_analytics/
batcher.rs

1use crate::message::{Batch, BatchMessage, Message};
2use crate::{Error, Result};
3use serde_json::{Map, Value};
4use time::OffsetDateTime;
5
6const MAX_MESSAGE_SIZE: usize = 1024 * 32;
7const MAX_BATCH_SIZE: usize = 1024 * 512;
8
9/// A batcher can accept messages into an internal buffer, and report when
10/// messages must be flushed.
11///
12/// The recommended usage pattern looks something like this:
13///
14/// ```
15/// use june_analytics::{Batcher, Client, HttpClient};
16/// use june_analytics::message::{BatchMessage, Track, User};
17/// use serde_json::json;
18///
19/// let mut batcher = Batcher::new(None);
20/// let client = HttpClient::default();
21///
22/// for i in 0..100 {
23///     let msg = Track {
24///         user: User::UserId { user_id: format!("user-{}", i) },
25///         event: "Example".to_owned(),
26///         properties: json!({ "foo": "bar" }),
27///         ..Default::default()
28///     };
29///
30///     // Batcher returns back ownership of a message if the internal buffer
31///     // would overflow.
32///     //
33///     // When this occurs, we flush the batcher, create a new batcher, and add
34///     // the message into the new batcher.
35///     if let Some(msg) = batcher.push(msg).unwrap() {
36///         client.send("your_write_key".to_string(), batcher.into_message());
37///         batcher = Batcher::new(None);
38///         batcher.push(msg).unwrap();
39///     }
40/// }
41/// ```
42///
43/// Batcher will attempt to fit messages into maximally-sized batches, thus
44/// reducing the number of round trips required with June's tracking API.
45/// However, if you produce messages infrequently, this may significantly delay
46/// the sending of messages to June.
47///
48/// If this delay is a concern, it is recommended that you periodically flush
49/// the batcher on your own by calling `into_message`.
50///
51/// By default if the message you push in the batcher does not contains any
52/// timestamp, the timestamp at the time of the push will be automatically
53/// added to your message.
54/// You can disable this behaviour with the [without_auto_timestamp] method
55/// though.
56#[derive(Clone, Debug)]
57pub struct Batcher {
58    pub(crate) buf: Vec<BatchMessage>,
59    pub(crate) byte_count: usize,
60    pub(crate) context: Option<Value>,
61    pub(crate) auto_timestamp: bool,
62}
63
64impl Batcher {
65    /// Construct a new, empty batcher.
66    ///
67    /// Optionally, you may specify a `context` that should be set on every
68    /// batch returned by `into_message`.
69    pub fn new(context: Option<Value>) -> Self {
70        Self {
71            buf: Vec::new(),
72            byte_count: 0,
73            context,
74            auto_timestamp: true,
75        }
76    }
77
78    pub fn without_auto_timestamp(&mut self) {
79        self.auto_timestamp = false;
80    }
81
82    /// Push a message into the batcher.
83    ///
84    /// Returns `Ok(None)` if the message was accepted and is now owned by the
85    /// batcher.
86    ///
87    /// Returns `Ok(Some(msg))` if the message was rejected because the current
88    /// batch would be oversized if this message were accepted. The given
89    /// message is returned back, and it is recommended that you flush the
90    /// current batch before attempting to push `msg` in again.
91    ///
92    /// Returns an error if the message is too large to be sent to June's
93    /// API.
94    pub fn push(&mut self, msg: impl Into<BatchMessage>) -> Result<Option<BatchMessage>> {
95        let mut msg: BatchMessage = msg.into();
96        let timestamp = msg.timestamp_mut();
97        if self.auto_timestamp && timestamp.is_none() {
98            *timestamp = Some(OffsetDateTime::now_utc());
99        }
100        let size = serde_json::to_vec(&msg)?.len();
101        if size > MAX_MESSAGE_SIZE {
102            return Err(Error::MessageTooLarge);
103        }
104
105        self.byte_count += size + 1; // +1 to account for Serialized data's extra commas
106        if self.byte_count > MAX_BATCH_SIZE {
107            return Ok(Some(msg));
108        }
109
110        self.buf.push(msg);
111        Ok(None)
112    }
113
114    pub fn into_message(self) -> Message {
115        Message::Batch(Batch {
116            batch: self.buf,
117            context: self.context,
118            integrations: None,
119            extra: Map::default(),
120        })
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use crate::message::{Track, User};
128    use serde_json::json;
129
130    #[test]
131    fn test_push_and_into() {
132        let batch_msg = BatchMessage::Track(Track {
133            ..Default::default()
134        });
135
136        let context = json!({
137            "foo": "bar",
138        });
139
140        let mut batcher = Batcher::new(Some(context.clone()));
141        batcher.without_auto_timestamp();
142        let result = batcher.push(batch_msg.clone());
143        assert_eq!(None, result.ok().unwrap());
144
145        let batch = batcher.into_message();
146        let inner_batch = match batch {
147            Message::Batch(b) => b,
148            _ => panic!("invalid message type"),
149        };
150        assert_eq!(context, inner_batch.context.unwrap());
151        assert_eq!(1, inner_batch.batch.len());
152
153        assert_eq!(inner_batch.batch, vec![batch_msg]);
154    }
155
156    #[test]
157    fn test_bad_message_size() {
158        let batch_msg = Track {
159            user: User::UserId {
160                user_id: String::from_utf8(vec![b'a'; 1024 * 33]).unwrap(),
161            },
162            ..Default::default()
163        };
164
165        let mut batcher = Batcher::new(None);
166        let result = batcher.push(batch_msg);
167
168        let err = result.err().unwrap();
169        assert!(err.to_string().contains("message too large"));
170    }
171
172    #[test]
173    fn test_max_buffer() {
174        let batch_msg = Track {
175            user: User::UserId {
176                user_id: String::from_utf8(vec![b'a'; 1024 * 30]).unwrap(),
177            },
178            ..Default::default()
179        };
180
181        let mut batcher = Batcher::new(None);
182        batcher.without_auto_timestamp();
183        let mut result = Ok(None);
184        for _i in 0..20 {
185            result = batcher.push(batch_msg.clone());
186            dbg!(&result);
187            if result.is_ok() && result.as_ref().ok().unwrap().is_some() {
188                break;
189            }
190        }
191
192        let msg = result.ok().unwrap();
193        assert_eq!(BatchMessage::from(batch_msg), msg.unwrap());
194    }
195}