ultra_batch/
batch_executor.rs

1use crate::Executor;
2use std::{borrow::Cow, sync::Arc};
3
4/// Batches calls to an [`Executor`], such as for bulk inserting, updating,
5/// or deleting records in a datastore. `BatchExecutor`s are asynchronous
6/// and designed to be passed and shared between threads or tasks. Cloning
7/// a `BatchExecutor` is shallow and will use the same underlying [`Executor`].
8///
9/// `BatchExecutor` is designed primarily for bulk database operations-- for
10/// example, inserting lots of records, where a single query to insert
11/// 50 new records is much faster than 50 separate queries. However, it can
12/// be used for fetching data or any other bulk operation as well.
13///
14/// Unlike [`BatchFetcher`](crate::BatchFetcher), `BatchExecutor` has no
15/// concepts of keys, values, deduplication, or caching; each executed value
16/// is passed directly to the underlying [`Executor`]. As such, it could also
17/// be suitable for writing a custom caching layer in situations where
18/// [`BatchFetcher`](crate::BatchFetcher) is not suitable.
19///
20/// `BatchExecutor`s introduce a small amount of latency for executions. Each
21/// time a new value or set of values is sent for execution, it will first
22/// wait for more values to buid a batch. The execution will only trigger after
23/// a timeout is reached or once enough values have been queued in the batch.
24/// See [`BatchExecutorBuilder`] for options to tweak latency and batch sizes.
25///
26/// ## Execution semantics
27///
28/// If the underlying [`Executor`] returns an error during the batch execution,
29/// then all pending [`execute`](BatchExecutor::execute) and [`execute_many`](BatchExecutor::execute_many)
30/// requests will fail. The same values can be resubmitted to retry.
31///
32/// If the underlying [`Executor`] succeeds but does not return a `Vec` that
33/// contains results for all values, then calls to [`execute`](BatchExecutor::execute)
34/// may return `None`. Calls to [`execute_many`](BatchExecutor::execute_many)
35/// may return a `Vec` containing less output values than input values.
36pub struct BatchExecutor<E>
37where
38    E: Executor,
39{
40    label: Cow<'static, str>,
41    _execute_task: Arc<tokio::task::JoinHandle<()>>,
42    execute_request_tx: tokio::sync::mpsc::Sender<ExecuteRequest<E::Value, E::Result>>,
43}
44
45impl<E> BatchExecutor<E>
46where
47    E: Executor + Send + Sync + 'static,
48{
49    /// Create a new `BatchExecutor` athat uses the given [`Executor`] to
50    /// execute values. Returns a [`BatchExecutorBuilder`], which can be
51    /// used to customize the `BatchExecutor`. Call [`.finish()`](BatchExecutorBuilder::finish)
52    /// to create the `BatchExecutor`.
53    ///
54    /// # Examples
55    ///
56    /// Creating a `BatchExecutor` with default options:
57    ///
58    /// ```
59    /// # use ultra_batch::{BatchExecutor, Executor};
60    /// # struct UserInserter;
61    /// # impl UserInserter {
62    /// #     fn new(db_conn: ()) -> Self { UserInserter }
63    /// #  }
64    /// # impl Executor for UserInserter {
65    /// #     type Value = ();
66    /// #     type Result = ();
67    /// #     type Error = anyhow::Error;
68    /// #     async fn execute(&self, values: Vec<()>) -> anyhow::Result<Vec<()>> {
69    /// #         unimplemented!();
70    /// #     }
71    /// # }
72    /// # #[tokio::main] async fn main() -> anyhow::Result<()> {
73    /// # let db_conn = ();
74    /// let user_inserter = UserInserter::new(db_conn);
75    /// let batch_inserter = BatchExecutor::build(user_inserter).finish();
76    /// # Ok(())
77    /// # }
78    /// ```
79    ///
80    /// Creating a `BatchExecutor` with custom options:
81    ///
82    /// ```
83    /// # use ultra_batch::{BatchExecutor, Executor};
84    /// # struct UserInserter;
85    /// # impl UserInserter {
86    /// #     fn new(db_conn: ()) -> Self { UserInserter }
87    /// #  }
88    /// # impl Executor for UserInserter {
89    /// #     type Value = ();
90    /// #     type Result = ();
91    /// #     type Error = anyhow::Error;
92    /// #     async fn execute(&self, values: Vec<()>) -> anyhow::Result<Vec<()>> {
93    /// #         unimplemented!();
94    /// #     }
95    /// # }
96    /// # #[tokio::main] async fn main() -> anyhow::Result<()> {
97    /// # let db_conn = ();
98    /// let user_inserter = UserInserter::new(db_conn);
99    /// let batch_inserter = BatchExecutor::build(user_inserter)
100    ///     .eager_batch_size(Some(50))
101    ///     .delay_duration(tokio::time::Duration::from_millis(5))
102    ///     .finish();
103    /// # Ok(())
104    /// # }
105    /// ```
106    pub fn build(executor: E) -> BatchExecutorBuilder<E> {
107        BatchExecutorBuilder {
108            executor,
109            delay_duration: tokio::time::Duration::from_millis(10),
110            eager_batch_size: Some(100),
111            label: "unlabeled-batch-executor".into(),
112        }
113    }
114
115    /// Submit a value to be executed by the [`Executor`]. Returns the
116    /// result value returned by the [`Executor`] for this given item. See
117    /// the type-level docs for [`BatchExecutor`](#execution-semantics) for
118    /// detailed execution semantics.
119    #[tracing::instrument(skip_all, fields(batch_executor = %self.label))]
120    pub async fn execute(&self, key: E::Value) -> Result<Option<E::Result>, ExecuteError> {
121        let mut values = self.execute_values(vec![key]).await?;
122        Ok(values.pop())
123    }
124
125    /// Submit multiple values to be executed by the [`Executor`]. Returns a
126    /// `Vec` containg values for each result returned by the [`Executor`]
127    /// for each given input value (but note that the returned `Vec` may
128    /// not have values for all inputs if the [`Executor`] did not return
129    /// enough results). See the type-level docs for [`BatchExecutor`](#execution-semantics)
130    /// for detailed execution semantics.
131    #[tracing::instrument(skip_all, fields(batch_executor = %self.label, num_values = values.len()))]
132    pub async fn execute_many(
133        &self,
134        values: Vec<E::Value>,
135    ) -> Result<Vec<E::Result>, ExecuteError> {
136        let results = self.execute_values(values).await?;
137        Ok(results)
138    }
139
140    async fn execute_values(&self, values: Vec<E::Value>) -> Result<Vec<E::Result>, ExecuteError> {
141        let execute_request_tx = self.execute_request_tx.clone();
142        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
143
144        tracing::debug!(
145            batch_executor = %self.label,
146            "sending a batch of values to execute",
147        );
148        let execute_request = ExecuteRequest { values, result_tx };
149        execute_request_tx
150            .send(execute_request)
151            .await
152            .map_err(|_| ExecuteError::SendError)?;
153
154        match result_rx.await {
155            Ok(Ok(results)) => {
156                tracing::debug!(batch_executor = %self.label, "fetch response returned successfully");
157                Ok(results)
158            }
159            Ok(Err(execute_error)) => {
160                tracing::info!("error returned while executing: {execute_error}");
161                Err(ExecuteError::ExecutorError(execute_error))
162            }
163            Err(recv_error) => {
164                panic!(
165                    "Batch result channel for batch executor {} hung up with error: {recv_error}",
166                    self.label,
167                );
168            }
169        }
170    }
171}
172
173impl<E> Clone for BatchExecutor<E>
174where
175    E: Executor,
176{
177    fn clone(&self) -> Self {
178        BatchExecutor {
179            _execute_task: self._execute_task.clone(),
180            execute_request_tx: self.execute_request_tx.clone(),
181            label: self.label.clone(),
182        }
183    }
184}
185
186/// Used to configure a new [`BatchExecutor`]. A `BatchExecutorBuilder` is
187/// returned from [`BatchExecutor::build`].
188pub struct BatchExecutorBuilder<E>
189where
190    E: Executor + Send + Sync + 'static,
191{
192    executor: E,
193    delay_duration: tokio::time::Duration,
194    eager_batch_size: Option<usize>,
195    label: Cow<'static, str>,
196}
197
198impl<E> BatchExecutorBuilder<E>
199where
200    E: Executor + Send + Sync + 'static,
201{
202    /// The maximum amount of time the [`BatchExecutor`] will wait to queue up
203    /// more keys before calling the [`Executor`].
204    pub fn delay_duration(mut self, delay: tokio::time::Duration) -> Self {
205        self.delay_duration = delay;
206        self
207    }
208
209    /// The maximum number of keys to wait for before eagerly calling the
210    /// [`Executor`]. A value of `Some(n)` will load the batch once `n` or more
211    /// keys have been queued (or once the timeout set by
212    /// [`delay_duration`](BatchExecutorBuilder::delay_duration) is reached,
213    /// whichever comes first). A value of `None` will never eagerly dispatch
214    /// the queue, and the [`BatchExecutor`] will always wait for the timeout
215    /// set by [`delay_duration`](BatchExecutorBuilder::delay_duration).
216    ///
217    /// Note that `eager_batch_size` **does not** set an upper limit on the
218    /// batch! For example, if [`BatchExecutor::execute_many`] is called with
219    /// more than `eager_batch_size` items, then the batch will be sent
220    /// immediately with _all_ of the provided values.
221    pub fn eager_batch_size(mut self, eager_batch_size: Option<usize>) -> Self {
222        self.eager_batch_size = eager_batch_size;
223        self
224    }
225
226    /// Set a label for the [`BatchExecutor`]. This is only used to improve
227    /// diagnostic messages, such as log messages.
228    pub fn label(mut self, label: impl Into<Cow<'static, str>>) -> Self {
229        self.label = label.into();
230        self
231    }
232
233    /// Create and return a [`BatchExecutor`] with the given options.
234    pub fn finish(self) -> BatchExecutor<E> {
235        let (execute_request_tx, mut execute_request_rx) =
236            tokio::sync::mpsc::channel::<ExecuteRequest<E::Value, E::Result>>(1);
237        let label = self.label.clone();
238
239        let execute_task = tokio::spawn({
240            async move {
241                'task: loop {
242                    // Wait for some values to come in
243                    let mut pending_values = vec![];
244                    let mut result_txs = vec![];
245
246                    tracing::trace!(batch_executor = %self.label, "waiting for values to execute...");
247                    match execute_request_rx.recv().await {
248                        Some(execute_request) => {
249                            tracing::trace!(batch_executor = %self.label, num_execute_request_values = execute_request.values.len(), "received initial execute request");
250
251                            let result_start_index = pending_values.len();
252                            pending_values.extend(execute_request.values);
253
254                            result_txs.push((result_start_index, execute_request.result_tx));
255                        }
256                        None => {
257                            // Execute queue closed, so we're done
258                            break 'task;
259                        }
260                    };
261
262                    // Wait for more values
263                    'wait_for_more_values: loop {
264                        let should_run_batch_now = match self.eager_batch_size {
265                            Some(eager_batch_size) => pending_values.len() >= eager_batch_size,
266                            None => false,
267                        };
268                        if should_run_batch_now {
269                            // We have enough values already, so don't wait for more
270                            tracing::trace!(
271                                batch_executor = %self.label,
272                                num_pending_values = pending_values.len(),
273                                eager_batch_size = ?self.eager_batch_size,
274                                "batch filled up, ready to execute now",
275                            );
276
277                            break 'wait_for_more_values;
278                        }
279
280                        let delay = tokio::time::sleep(self.delay_duration);
281                        tokio::pin!(delay);
282
283                        tokio::select! {
284                            execute_request = execute_request_rx.recv() => {
285                                match execute_request {
286                                    Some(execute_request) => {
287                                        tracing::trace!(batch_executor = %self.label, num_execute_request_values = execute_request.values.len(), "retrieved additional execute request");
288
289
290                                        let result_start_index = pending_values.len();
291                                        pending_values.extend(execute_request.values);
292
293                                        result_txs.push((result_start_index, execute_request.result_tx));
294                                    }
295                                    None => {
296                                        // Executor queue closed, so we're done waiting for keys
297                                        tracing::debug!(batch_executor = %self.label, num_pending_values = pending_values.len(), "execute channel closed");
298                                        break 'wait_for_more_values;
299                                    }
300                                }
301
302                            }
303                            _ = &mut delay => {
304                                // Reached delay, so we're done waiting for keys
305                                tracing::trace!(
306                                    batch_executor = %self.label,
307                                    num_pending_values = pending_values.len(),
308                                    "delay reached while waiting for more values to fetch"
309                                );
310                                break 'wait_for_more_values;
311                            }
312                        };
313                    }
314
315                    tracing::trace!(batch_executor = %self.label, num_pending_values = pending_values.len(), num_pending_channels = result_txs.len(), "fetching values");
316                    let mut result = self
317                        .executor
318                        .execute(pending_values)
319                        .await
320                        .map_err(|error| error.to_string());
321
322                    for (result_range, result_tx) in result_txs.into_iter().rev() {
323                        let result = match &mut result {
324                            Ok(result) => {
325                                if result_range <= result.len() {
326                                    Ok(result.split_off(result_range))
327                                } else {
328                                    Ok(vec![])
329                                }
330                            }
331                            Err(error) => Err(error.clone()),
332                        };
333
334                        // Ignore error if receiver was already closed
335                        let _ = result_tx.send(result);
336                    }
337                }
338            }
339        });
340
341        BatchExecutor {
342            label,
343            _execute_task: Arc::new(execute_task),
344            execute_request_tx,
345        }
346    }
347}
348
349struct ExecuteRequest<V, R> {
350    values: Vec<V>,
351    result_tx: tokio::sync::oneshot::Sender<Result<Vec<R>, String>>,
352}
353
354/// Error indicating that execution of one or more values from a
355/// [`BatchExecutor`] failed.
356#[derive(Debug, thiserror::Error)]
357pub enum ExecuteError {
358    /// The [`Executor`] returned an error while loading the batch. The message
359    /// contains the error message specified by [`Executor::Error`].
360    #[error("error while executing batch: {}", _0)]
361    ExecutorError(String),
362
363    /// The request could not be sent to the [`BatchExecutor`].
364    #[error("error sending execution request")]
365    SendError,
366}