Skip to main content

xet_data/file_reconstruction/data_writer/
download_stream.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use tokio::sync::Notify;
5use tokio::sync::mpsc::UnboundedReceiver;
6use tracing::info;
7
8use super::super::error::{FileReconstructionError, Result};
9use super::super::file_reconstructor::FileReconstructor;
10use super::super::run_state::RunState;
11use super::sequential_writer::{SequentialRetrievalItem, SequentialWriter};
12
13/// A streaming download handle that yields data chunks as they are reconstructed.
14///
15/// Created by [`FileReconstructor::reconstruct_to_stream`].  The reconstruction
16/// task is spawned immediately but pauses until [`start`](Self::start) is
17/// called (or the first [`next`](Self::next) / [`blocking_next`](Self::blocking_next)).
18/// Because the `tokio::spawn` happens at construction time, subsequent calls to
19/// `start()`, `next()`, and `blocking_next()` do **not** require a tokio runtime
20/// context.
21///
22/// Data is delivered by pulling items directly from the sequential writer's
23/// internal queue, bypassing the synchronous writer thread entirely. Each call
24/// to [`blocking_next`](Self::blocking_next) / [`next`](Self::next) returns the
25/// next sequential chunk, or `None` when the download is complete. Any
26/// reconstruction error is surfaced on the call that would have returned the
27/// next chunk (or on the final `None` boundary) via the shared run state.
28pub struct DownloadStream {
29    /// Channel receiver for sequential retrieval items from the writer queue.
30    receiver: UnboundedReceiver<SequentialRetrievalItem>,
31    /// Whether the stream has finished (no more data).
32    finished: bool,
33    /// Shared run state with the `FileReconstructor`. When cancelled,
34    /// the reconstruction loop aborts promptly at its next check point or
35    /// `select!` branch. Also used for progress reporting and error propagation.
36    run_state: Arc<RunState>,
37    /// Signal to unblock the spawned reconstruction task. `Some` means
38    /// `start()` has not yet been called; the spawned task is waiting.
39    start_signal: Option<Arc<Notify>>,
40}
41
42impl DownloadStream {
43    /// Creates a new `DownloadStream`, immediately spawning the reconstruction
44    /// task on the current tokio runtime.  The task blocks on an internal
45    /// [`Notify`] until [`start`](Self::start) is called.
46    ///
47    /// # Panics
48    ///
49    /// Panics if called outside a tokio runtime context.
50    pub(crate) fn new(reconstructor: FileReconstructor, run_state: Arc<RunState>) -> Self {
51        let (data_writer, receiver) = SequentialWriter::new_streaming(run_state.clone());
52        let start_signal = Arc::new(Notify::new());
53
54        let signal = start_signal.clone();
55        let rs = run_state.clone();
56        tokio::spawn(async move {
57            signal.notified().await;
58            info!(file_hash = %rs.file_hash(), "Starting download stream");
59            let _ = reconstructor.run(data_writer, rs, true).await;
60        });
61
62        Self {
63            receiver,
64            finished: false,
65            run_state,
66            start_signal: Some(start_signal),
67        }
68    }
69
70    pub(crate) fn abort_callback(&self) -> Box<dyn Fn() + Send + Sync> {
71        let run_state = self.run_state.clone();
72        let start_signal = self.start_signal.clone();
73        Box::new(move || {
74            run_state.cancel();
75            if let Some(signal) = start_signal.as_ref() {
76                signal.notify_one();
77            }
78        })
79    }
80
81    /// Unblocks the reconstruction task so it begins producing data.
82    ///
83    /// If already started, this is a no-op. Called automatically on the first
84    /// [`next`](Self::next) / [`blocking_next`](Self::blocking_next).
85    ///
86    /// This method is non-async and does not require a tokio runtime context.
87    pub fn start(&mut self) {
88        if let Some(signal) = self.start_signal.take() {
89            signal.notify_one();
90        }
91    }
92
93    fn ensure_started(&mut self) {
94        if self.start_signal.is_some() {
95            self.start();
96        }
97    }
98
99    fn cancel_reconstruction(&self) {
100        self.run_state.cancel();
101        if let Some(signal) = self.start_signal.as_ref() {
102            signal.notify_one();
103        }
104    }
105
106    /// Returns the next chunk of downloaded data, blocking the current thread
107    /// until data is available.
108    ///
109    /// Returns `Ok(None)` when the download is complete.
110    ///
111    /// # Panics
112    ///
113    /// Panics if called from within an async runtime context (e.g. inside a
114    /// `tokio::spawn` or `async fn`). Use from a regular thread or from
115    /// [`tokio::task::spawn_blocking`] instead. For the async-safe variant,
116    /// use [`next`](Self::next).
117    pub fn blocking_next(&mut self) -> Result<Option<Bytes>> {
118        if self.finished {
119            return Ok(None);
120        }
121        self.ensure_started();
122
123        match self.receiver.blocking_recv() {
124            Some(SequentialRetrievalItem::Data { receiver, permit }) => {
125                let data = match receiver.blocking_recv() {
126                    Ok(data) => data,
127                    Err(_) => {
128                        self.run_state.check_error()?;
129                        return Err(FileReconstructionError::InternalWriterError(
130                            "Data sender was dropped before sending data.".to_string(),
131                        ));
132                    },
133                };
134                self.run_state.report_bytes_written(data.len() as u64);
135                drop(permit);
136                Ok(Some(data))
137            },
138            Some(SequentialRetrievalItem::Finish) | None => {
139                self.finished = true;
140                self.run_state.check_error()?;
141                Ok(None)
142            },
143        }
144    }
145
146    /// Returns the next chunk of downloaded data asynchronously.
147    ///
148    /// Returns `Ok(None)` when the download is complete or cancelled.
149    pub async fn next(&mut self) -> Result<Option<Bytes>> {
150        if self.finished {
151            return Ok(None);
152        }
153        self.ensure_started();
154
155        let item = if let Ok(item) = self.receiver.try_recv() {
156            Some(item)
157        } else {
158            tokio::select! {
159                biased;
160                recv = self.receiver.recv() => recv,
161                _ = self.run_state.cancelled() => None,
162            }
163        };
164
165        match item {
166            Some(SequentialRetrievalItem::Data { receiver, permit }) => {
167                let data = match receiver.await {
168                    Ok(data) => data,
169                    Err(_) => {
170                        self.run_state.check_error()?;
171                        return Err(FileReconstructionError::InternalWriterError(
172                            "Data sender was dropped before sending data.".to_string(),
173                        ));
174                    },
175                };
176                self.run_state.report_bytes_written(data.len() as u64);
177                drop(permit);
178                Ok(Some(data))
179            },
180            Some(SequentialRetrievalItem::Finish) | None => {
181                self.finished = true;
182                self.run_state.check_error()?;
183                Ok(None)
184            },
185        }
186    }
187
188    /// Cancels the in-progress (or not-yet-started) download.
189    ///
190    /// Signals the shared run state so the reconstruction loop aborts at its
191    /// next check point or `select!` branch, and closes the channel receiver.
192    /// After calling this, subsequent calls to [`blocking_next`](Self::blocking_next)
193    /// / [`next`](Self::next) will return `Ok(None)`.
194    pub fn cancel(&mut self) {
195        self.cancel_reconstruction();
196        let _ = self.start_signal.take();
197        self.receiver.close();
198        self.finished = true;
199    }
200}
201
202impl Drop for DownloadStream {
203    fn drop(&mut self) {
204        self.cancel_reconstruction();
205        self.receiver.close();
206    }
207}