tokio_process_tools/collector.rs
1use crate::output_stream::Next;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use thiserror::Error;
6use tokio::sync::oneshot::Sender;
7use tokio::task::JoinHandle;
8
9#[derive(Debug, Error)]
10pub enum CollectorError {
11 #[error("The collector task could not be joined/terminated: {0}")]
12 TaskJoin(#[source] tokio::task::JoinError),
13}
14
15pub trait Sink: Debug + Send + Sync + 'static {}
16
17impl<T> Sink for T where T: Debug + Send + Sync + 'static {}
18
19// NOTE: We use Pin<Box> here to force usage of Higher-Rank Trait Bounds (HRTBs).
20// The returned futures will most-likely capture the `&mut T`and are therefore poised
21// by its lifetime. Without the trait-object usage, this would not work.
22pub type AsyncCollectFn<'a> = Pin<Box<dyn Future<Output = Next> + Send + 'a>>;
23
24/// A collector for stream data, inspecting it chunk by chunk but also providing mutable access
25/// to a sink in which the data can be stored.
26///
27/// See the `collect_*` functions on `BroadcastOutputStream` and `SingleOutputStream`.
28///
29/// For proper cleanup, call
30/// - `wait()`, which waits for the collection task to complete.
31/// - `cancel()`, which sends a termination signal and then waits for the collection task to complete.
32///
33/// If not cleaned up, the termination signal will be sent when dropping this collector,
34/// but the task will be aborted (forceful, not waiting for its regular completion).
35pub struct Collector<S: Sink> {
36 pub(crate) task: Option<JoinHandle<S>>,
37 pub(crate) task_termination_sender: Option<Sender<()>>,
38}
39
40impl<S: Sink> Collector<S> {
41 /// Checks if this task has finished.
42 pub fn is_finished(&self) -> bool {
43 self.task.as_ref().map(|t| t.is_finished()).unwrap_or(true)
44 }
45
46 /// Wait for the collector to terminate naturally.
47 ///
48 /// A collector will automatically terminate when either:
49 ///
50 /// 1. The underlying write-side of the stream is dropped.
51 /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
52 /// 3. The first `Next::Break` is observed.
53 ///
54 /// If none of these may occur in your case, this could/will hang forever!
55 pub async fn wait(mut self) -> Result<S, CollectorError> {
56 // Take the `task_termination_sender`. Let's make sure nobody can ever interfere with us
57 // waiting here. DO NOT drop it, or the task will terminate (at least if it also takes the
58 // receive-error as a signal to terminate)!
59 let tts = self.task_termination_sender.take();
60
61 let sink = self
62 .task
63 .take()
64 .expect("`task` to be present.")
65 .await
66 .map_err(CollectorError::TaskJoin);
67
68 // Drop the termination sender, we don't need it. Task is now terminated.
69 drop(tts);
70
71 sink
72 }
73
74 /// Sends a cancellation event to the collector, letting it shut down.
75 pub async fn cancel(mut self) -> Result<S, CollectorError> {
76 // We ignore any potential error here.
77 // Sending may fail if the task is already terminated (for example, by reaching EOF),
78 // which in turn dropped the receiver end!
79 let _res = self
80 .task_termination_sender
81 .take()
82 .expect("`task_termination_sender` to be present.")
83 .send(());
84
85 self.wait().await
86 }
87}
88
89impl<S: Sink> Drop for Collector<S> {
90 fn drop(&mut self) {
91 if let Some(task_termination_sender) = self.task_termination_sender.take() {
92 // We ignore any potential error here.
93 // Sending may fail if the task is already terminated (for example, by reaching EOF),
94 // which in turn dropped the receiver end!
95 let _res = task_termination_sender.send(());
96 }
97 if let Some(task) = self.task.take() {
98 task.abort();
99 }
100 }
101}