ultra_batch/batch_executor.rs
1use crate::Executor;
2use std::{borrow::Cow, sync::Arc};
3
4/// Batches calls to an [`Executor`], such as for bulk inserting, updating,
5/// or deleting records in a datastore. `BatchExecutor`s are asynchronous
6/// and designed to be passed and shared between threads or tasks. Cloning
7/// a `BatchExecutor` is shallow and will use the same underlying [`Executor`].
8///
9/// `BatchExecutor` is designed primarily for bulk database operations-- for
10/// example, inserting lots of records, where a single query to insert
11/// 50 new records is much faster than 50 separate queries. However, it can
12/// be used for fetching data or any other bulk operation as well.
13///
14/// Unlike [`BatchFetcher`](crate::BatchFetcher), `BatchExecutor` has no
15/// concepts of keys, values, deduplication, or caching; each executed value
16/// is passed directly to the underlying [`Executor`]. As such, it could also
17/// be suitable for writing a custom caching layer in situations where
18/// [`BatchFetcher`](crate::BatchFetcher) is not suitable.
19///
20/// `BatchExecutor`s introduce a small amount of latency for executions. Each
21/// time a new value or set of values is sent for execution, it will first
22/// wait for more values to buid a batch. The execution will only trigger after
23/// a timeout is reached or once enough values have been queued in the batch.
24/// See [`BatchExecutorBuilder`] for options to tweak latency and batch sizes.
25///
26/// ## Execution semantics
27///
28/// If the underlying [`Executor`] returns an error during the batch execution,
29/// then all pending [`execute`](BatchExecutor::execute) and [`execute_many`](BatchExecutor::execute_many)
30/// requests will fail. The same values can be resubmitted to retry.
31///
32/// If the underlying [`Executor`] succeeds but does not return a `Vec` that
33/// contains results for all values, then calls to [`execute`](BatchExecutor::execute)
34/// may return `None`. Calls to [`execute_many`](BatchExecutor::execute_many)
35/// may return a `Vec` containing less output values than input values.
36pub struct BatchExecutor<E>
37where
38 E: Executor,
39{
40 label: Cow<'static, str>,
41 _execute_task: Arc<tokio::task::JoinHandle<()>>,
42 execute_request_tx: tokio::sync::mpsc::Sender<ExecuteRequest<E::Value, E::Result>>,
43}
44
45impl<E> BatchExecutor<E>
46where
47 E: Executor + Send + Sync + 'static,
48{
49 /// Create a new `BatchExecutor` athat uses the given [`Executor`] to
50 /// execute values. Returns a [`BatchExecutorBuilder`], which can be
51 /// used to customize the `BatchExecutor`. Call [`.finish()`](BatchExecutorBuilder::finish)
52 /// to create the `BatchExecutor`.
53 ///
54 /// # Examples
55 ///
56 /// Creating a `BatchExecutor` with default options:
57 ///
58 /// ```
59 /// # use ultra_batch::{BatchExecutor, Executor};
60 /// # struct UserInserter;
61 /// # impl UserInserter {
62 /// # fn new(db_conn: ()) -> Self { UserInserter }
63 /// # }
64 /// # impl Executor for UserInserter {
65 /// # type Value = ();
66 /// # type Result = ();
67 /// # type Error = anyhow::Error;
68 /// # async fn execute(&self, values: Vec<()>) -> anyhow::Result<Vec<()>> {
69 /// # unimplemented!();
70 /// # }
71 /// # }
72 /// # #[tokio::main] async fn main() -> anyhow::Result<()> {
73 /// # let db_conn = ();
74 /// let user_inserter = UserInserter::new(db_conn);
75 /// let batch_inserter = BatchExecutor::build(user_inserter).finish();
76 /// # Ok(())
77 /// # }
78 /// ```
79 ///
80 /// Creating a `BatchExecutor` with custom options:
81 ///
82 /// ```
83 /// # use ultra_batch::{BatchExecutor, Executor};
84 /// # struct UserInserter;
85 /// # impl UserInserter {
86 /// # fn new(db_conn: ()) -> Self { UserInserter }
87 /// # }
88 /// # impl Executor for UserInserter {
89 /// # type Value = ();
90 /// # type Result = ();
91 /// # type Error = anyhow::Error;
92 /// # async fn execute(&self, values: Vec<()>) -> anyhow::Result<Vec<()>> {
93 /// # unimplemented!();
94 /// # }
95 /// # }
96 /// # #[tokio::main] async fn main() -> anyhow::Result<()> {
97 /// # let db_conn = ();
98 /// let user_inserter = UserInserter::new(db_conn);
99 /// let batch_inserter = BatchExecutor::build(user_inserter)
100 /// .eager_batch_size(Some(50))
101 /// .delay_duration(tokio::time::Duration::from_millis(5))
102 /// .finish();
103 /// # Ok(())
104 /// # }
105 /// ```
106 pub fn build(executor: E) -> BatchExecutorBuilder<E> {
107 BatchExecutorBuilder {
108 executor,
109 delay_duration: tokio::time::Duration::from_millis(10),
110 eager_batch_size: Some(100),
111 label: "unlabeled-batch-executor".into(),
112 }
113 }
114
115 /// Submit a value to be executed by the [`Executor`]. Returns the
116 /// result value returned by the [`Executor`] for this given item. See
117 /// the type-level docs for [`BatchExecutor`](#execution-semantics) for
118 /// detailed execution semantics.
119 #[tracing::instrument(skip_all, fields(batch_executor = %self.label))]
120 pub async fn execute(&self, key: E::Value) -> Result<Option<E::Result>, ExecuteError> {
121 let mut values = self.execute_values(vec![key]).await?;
122 Ok(values.pop())
123 }
124
125 /// Submit multiple values to be executed by the [`Executor`]. Returns a
126 /// `Vec` containg values for each result returned by the [`Executor`]
127 /// for each given input value (but note that the returned `Vec` may
128 /// not have values for all inputs if the [`Executor`] did not return
129 /// enough results). See the type-level docs for [`BatchExecutor`](#execution-semantics)
130 /// for detailed execution semantics.
131 #[tracing::instrument(skip_all, fields(batch_executor = %self.label, num_values = values.len()))]
132 pub async fn execute_many(
133 &self,
134 values: Vec<E::Value>,
135 ) -> Result<Vec<E::Result>, ExecuteError> {
136 let results = self.execute_values(values).await?;
137 Ok(results)
138 }
139
140 async fn execute_values(&self, values: Vec<E::Value>) -> Result<Vec<E::Result>, ExecuteError> {
141 let execute_request_tx = self.execute_request_tx.clone();
142 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
143
144 tracing::debug!(
145 batch_executor = %self.label,
146 "sending a batch of values to execute",
147 );
148 let execute_request = ExecuteRequest { values, result_tx };
149 execute_request_tx
150 .send(execute_request)
151 .await
152 .map_err(|_| ExecuteError::SendError)?;
153
154 match result_rx.await {
155 Ok(Ok(results)) => {
156 tracing::debug!(batch_executor = %self.label, "fetch response returned successfully");
157 Ok(results)
158 }
159 Ok(Err(execute_error)) => {
160 tracing::info!("error returned while executing: {execute_error}");
161 Err(ExecuteError::ExecutorError(execute_error))
162 }
163 Err(recv_error) => {
164 panic!(
165 "Batch result channel for batch executor {} hung up with error: {recv_error}",
166 self.label,
167 );
168 }
169 }
170 }
171}
172
173impl<E> Clone for BatchExecutor<E>
174where
175 E: Executor,
176{
177 fn clone(&self) -> Self {
178 BatchExecutor {
179 _execute_task: self._execute_task.clone(),
180 execute_request_tx: self.execute_request_tx.clone(),
181 label: self.label.clone(),
182 }
183 }
184}
185
186/// Used to configure a new [`BatchExecutor`]. A `BatchExecutorBuilder` is
187/// returned from [`BatchExecutor::build`].
188pub struct BatchExecutorBuilder<E>
189where
190 E: Executor + Send + Sync + 'static,
191{
192 executor: E,
193 delay_duration: tokio::time::Duration,
194 eager_batch_size: Option<usize>,
195 label: Cow<'static, str>,
196}
197
198impl<E> BatchExecutorBuilder<E>
199where
200 E: Executor + Send + Sync + 'static,
201{
202 /// The maximum amount of time the [`BatchExecutor`] will wait to queue up
203 /// more keys before calling the [`Executor`].
204 pub fn delay_duration(mut self, delay: tokio::time::Duration) -> Self {
205 self.delay_duration = delay;
206 self
207 }
208
209 /// The maximum number of keys to wait for before eagerly calling the
210 /// [`Executor`]. A value of `Some(n)` will load the batch once `n` or more
211 /// keys have been queued (or once the timeout set by
212 /// [`delay_duration`](BatchExecutorBuilder::delay_duration) is reached,
213 /// whichever comes first). A value of `None` will never eagerly dispatch
214 /// the queue, and the [`BatchExecutor`] will always wait for the timeout
215 /// set by [`delay_duration`](BatchExecutorBuilder::delay_duration).
216 ///
217 /// Note that `eager_batch_size` **does not** set an upper limit on the
218 /// batch! For example, if [`BatchExecutor::execute_many`] is called with
219 /// more than `eager_batch_size` items, then the batch will be sent
220 /// immediately with _all_ of the provided values.
221 pub fn eager_batch_size(mut self, eager_batch_size: Option<usize>) -> Self {
222 self.eager_batch_size = eager_batch_size;
223 self
224 }
225
226 /// Set a label for the [`BatchExecutor`]. This is only used to improve
227 /// diagnostic messages, such as log messages.
228 pub fn label(mut self, label: impl Into<Cow<'static, str>>) -> Self {
229 self.label = label.into();
230 self
231 }
232
233 /// Create and return a [`BatchExecutor`] with the given options.
234 pub fn finish(self) -> BatchExecutor<E> {
235 let (execute_request_tx, mut execute_request_rx) =
236 tokio::sync::mpsc::channel::<ExecuteRequest<E::Value, E::Result>>(1);
237 let label = self.label.clone();
238
239 let execute_task = tokio::spawn({
240 async move {
241 'task: loop {
242 // Wait for some values to come in
243 let mut pending_values = vec![];
244 let mut result_txs = vec![];
245
246 tracing::trace!(batch_executor = %self.label, "waiting for values to execute...");
247 match execute_request_rx.recv().await {
248 Some(execute_request) => {
249 tracing::trace!(batch_executor = %self.label, num_execute_request_values = execute_request.values.len(), "received initial execute request");
250
251 let result_start_index = pending_values.len();
252 pending_values.extend(execute_request.values);
253
254 result_txs.push((result_start_index, execute_request.result_tx));
255 }
256 None => {
257 // Execute queue closed, so we're done
258 break 'task;
259 }
260 };
261
262 // Wait for more values
263 'wait_for_more_values: loop {
264 let should_run_batch_now = match self.eager_batch_size {
265 Some(eager_batch_size) => pending_values.len() >= eager_batch_size,
266 None => false,
267 };
268 if should_run_batch_now {
269 // We have enough values already, so don't wait for more
270 tracing::trace!(
271 batch_executor = %self.label,
272 num_pending_values = pending_values.len(),
273 eager_batch_size = ?self.eager_batch_size,
274 "batch filled up, ready to execute now",
275 );
276
277 break 'wait_for_more_values;
278 }
279
280 let delay = tokio::time::sleep(self.delay_duration);
281 tokio::pin!(delay);
282
283 tokio::select! {
284 execute_request = execute_request_rx.recv() => {
285 match execute_request {
286 Some(execute_request) => {
287 tracing::trace!(batch_executor = %self.label, num_execute_request_values = execute_request.values.len(), "retrieved additional execute request");
288
289
290 let result_start_index = pending_values.len();
291 pending_values.extend(execute_request.values);
292
293 result_txs.push((result_start_index, execute_request.result_tx));
294 }
295 None => {
296 // Executor queue closed, so we're done waiting for keys
297 tracing::debug!(batch_executor = %self.label, num_pending_values = pending_values.len(), "execute channel closed");
298 break 'wait_for_more_values;
299 }
300 }
301
302 }
303 _ = &mut delay => {
304 // Reached delay, so we're done waiting for keys
305 tracing::trace!(
306 batch_executor = %self.label,
307 num_pending_values = pending_values.len(),
308 "delay reached while waiting for more values to fetch"
309 );
310 break 'wait_for_more_values;
311 }
312 };
313 }
314
315 tracing::trace!(batch_executor = %self.label, num_pending_values = pending_values.len(), num_pending_channels = result_txs.len(), "fetching values");
316 let mut result = self
317 .executor
318 .execute(pending_values)
319 .await
320 .map_err(|error| error.to_string());
321
322 for (result_range, result_tx) in result_txs.into_iter().rev() {
323 let result = match &mut result {
324 Ok(result) => {
325 if result_range <= result.len() {
326 Ok(result.split_off(result_range))
327 } else {
328 Ok(vec![])
329 }
330 }
331 Err(error) => Err(error.clone()),
332 };
333
334 // Ignore error if receiver was already closed
335 let _ = result_tx.send(result);
336 }
337 }
338 }
339 });
340
341 BatchExecutor {
342 label,
343 _execute_task: Arc::new(execute_task),
344 execute_request_tx,
345 }
346 }
347}
348
349struct ExecuteRequest<V, R> {
350 values: Vec<V>,
351 result_tx: tokio::sync::oneshot::Sender<Result<Vec<R>, String>>,
352}
353
354/// Error indicating that execution of one or more values from a
355/// [`BatchExecutor`] failed.
356#[derive(Debug, thiserror::Error)]
357pub enum ExecuteError {
358 /// The [`Executor`] returned an error while loading the batch. The message
359 /// contains the error message specified by [`Executor::Error`].
360 #[error("error while executing batch: {}", _0)]
361 ExecutorError(String),
362
363 /// The request could not be sent to the [`BatchExecutor`].
364 #[error("error sending execution request")]
365 SendError,
366}