Skip to main content

xet_data/file_reconstruction/data_writer/
unordered_download_stream.rs

1use std::sync::Arc;
2use std::sync::atomic::Ordering;
3
4use bytes::Bytes;
5use tokio::sync::Notify;
6use tokio::sync::mpsc::UnboundedReceiver;
7use tracing::info;
8
9use super::super::error::Result;
10use super::super::file_reconstructor::FileReconstructor;
11use super::super::run_state::RunState;
12use super::unordered_writer::{CompletedTerm, UnorderedWriterProgress};
13
14/// A streaming download handle that yields data chunks in completion order,
15/// each tagged with its byte offset in the output file.
16///
17/// Created by [`FileReconstructor::reconstruct_to_unordered_stream`]. The
18/// reconstruction task is spawned immediately but pauses until
19/// [`start`](Self::start) is called (or the first [`next`](Self::next) /
20/// [`blocking_next`](Self::blocking_next)). Because the `tokio::spawn`
21/// happens at construction time, subsequent calls to `start()`, `next()`,
22/// and `blocking_next()` do **not** require a tokio runtime context.
23///
24/// Unlike [`DownloadStream`](super::download_stream::DownloadStream), data
25/// chunks may arrive out of order. Each chunk is returned as `(offset, Bytes)`
26/// so the consumer knows where it belongs. Progress can be monitored via
27/// the tracking methods which read shared atomic counters.
28///
29/// Holds only `Arc<WriterProgress>`, not the writer itself, so the channel
30/// sender is dropped naturally when the reconstruction task finishes.
31pub struct UnorderedDownloadStream {
32    /// Shared atomic progress counters (also held by the writer and its tasks).
33    progress: Arc<UnorderedWriterProgress>,
34
35    /// Channel receiver for completed terms from spawned tasks.
36    receiver: UnboundedReceiver<Result<CompletedTerm>>,
37
38    /// Whether the stream has finished (no more data).
39    finished: bool,
40
41    /// Shared run state with the `FileReconstructor`.
42    run_state: Arc<RunState>,
43
44    /// Signal to unblock the spawned reconstruction task. `Some` means
45    /// `start()` has not yet been called; the spawned task is waiting.
46    start_signal: Option<Arc<Notify>>,
47}
48
49impl UnorderedDownloadStream {
50    /// Creates a new `UnorderedDownloadStream`, immediately spawning the
51    /// reconstruction task on the current tokio runtime. The task blocks
52    /// on an internal [`Notify`] until [`start`](Self::start) is called.
53    ///
54    /// # Panics
55    ///
56    /// Panics if called outside a tokio runtime context.
57    pub(crate) fn new(reconstructor: FileReconstructor, run_state: Arc<RunState>) -> Self {
58        use super::unordered_writer::UnorderedWriter;
59
60        let (writer, receiver, progress) = UnorderedWriter::new_streaming(run_state.clone());
61        let start_signal = Arc::new(Notify::new());
62
63        let signal = start_signal.clone();
64        let rs = run_state.clone();
65        tokio::spawn(async move {
66            signal.notified().await;
67            info!(file_hash = %rs.file_hash(), "Starting unordered download stream");
68            let _ = reconstructor.run(writer, rs, true).await;
69        });
70
71        Self {
72            progress,
73            receiver,
74            finished: false,
75            run_state,
76            start_signal: Some(start_signal),
77        }
78    }
79
80    pub(crate) fn abort_callback(&self) -> Box<dyn Fn() + Send + Sync> {
81        let run_state = self.run_state.clone();
82        let start_signal = self.start_signal.clone();
83        Box::new(move || {
84            run_state.cancel();
85            if let Some(signal) = start_signal.as_ref() {
86                signal.notify_one();
87            }
88        })
89    }
90
91    /// Unblocks the reconstruction task so it begins producing data.
92    ///
93    /// If already started, this is a no-op. Called automatically on the first
94    /// [`next`](Self::next) / [`blocking_next`](Self::blocking_next).
95    ///
96    /// This method is non-async and does not require a tokio runtime context.
97    pub fn start(&mut self) {
98        if let Some(signal) = self.start_signal.take() {
99            signal.notify_one();
100        }
101    }
102
103    fn ensure_started(&mut self) {
104        if self.start_signal.is_some() {
105            self.start();
106        }
107    }
108
109    fn cancel_reconstruction(&self) {
110        self.run_state.cancel();
111        if let Some(signal) = self.start_signal.as_ref() {
112            signal.notify_one();
113        }
114    }
115
116    /// Returns the next chunk of downloaded data with its byte offset,
117    /// blocking the current thread until data is available.
118    ///
119    /// Returns `Ok(None)` when the download is complete.
120    ///
121    /// # Panics
122    ///
123    /// Panics if called from within an async runtime context. Use from a
124    /// regular thread or from [`tokio::task::spawn_blocking`] instead.
125    /// For the async-safe variant, use [`next`](Self::next).
126    pub fn blocking_next(&mut self) -> Result<Option<(u64, Bytes)>> {
127        if self.finished {
128            return Ok(None);
129        }
130        self.ensure_started();
131
132        match self.receiver.blocking_recv() {
133            Some(result) => self.process_term(result),
134            None => {
135                self.finished = true;
136                self.run_state.check_error()?;
137                Ok(None)
138            },
139        }
140    }
141
142    /// Returns the next chunk of downloaded data with its byte offset
143    /// asynchronously.
144    ///
145    /// Returns `Ok(None)` when the download is complete.
146    pub async fn next(&mut self) -> Result<Option<(u64, Bytes)>> {
147        if self.finished {
148            return Ok(None);
149        }
150        self.ensure_started();
151
152        if let Ok(result) = self.receiver.try_recv() {
153            return self.process_term(result);
154        }
155
156        let next_item = tokio::select! {
157            biased;
158            recv = self.receiver.recv() => recv,
159            _ = self.run_state.cancelled() => None,
160        };
161
162        match next_item {
163            Some(result) => self.process_term(result),
164            None => {
165                self.finished = true;
166                self.run_state.check_error()?;
167                Ok(None)
168            },
169        }
170    }
171
172    fn process_term(&mut self, result: Result<CompletedTerm>) -> Result<Option<(u64, Bytes)>> {
173        let term = result?;
174        self.run_state.report_bytes_written(term.data.len() as u64);
175        let offset = term.byte_range.start;
176        let data = term.data;
177        drop(term.permit);
178        Ok(Some((offset, data)))
179    }
180
181    /// Cancels the in-progress (or not-yet-started) download.
182    ///
183    /// Signals the shared run state so the reconstruction loop aborts at its
184    /// next check point. After calling this, subsequent calls to
185    /// [`blocking_next`](Self::blocking_next) / [`next`](Self::next) will
186    /// return `Ok(None)`.
187    pub fn cancel(&mut self) {
188        self.cancel_reconstruction();
189        let _ = self.start_signal.take();
190        self.receiver.close();
191        self.finished = true;
192    }
193
194    // ── Tracking methods ─────────────────────────────────────────────────
195
196    /// Total bytes expected for the reconstruction, read from the progress
197    /// updater. Returns 0 if not yet known or no progress updater is set.
198    pub fn total_bytes_expected(&self) -> u64 {
199        self.run_state
200            .progress_updater()
201            .map(|u| u.item().total_bytes.load(Ordering::Acquire))
202            .unwrap_or(0)
203    }
204
205    /// Bytes currently being fetched by in-progress tasks.
206    pub fn bytes_in_progress(&self) -> u64 {
207        self.progress.bytes_in_progress()
208    }
209
210    /// Bytes that have been delivered through the progress updater.
211    /// Returns 0 if no progress updater is set.
212    pub fn bytes_completed(&self) -> u64 {
213        self.run_state
214            .progress_updater()
215            .map(|u| u.total_bytes_completed())
216            .unwrap_or(0)
217    }
218
219    /// Number of tasks currently resolving data futures.
220    pub fn terms_in_progress(&self) -> u64 {
221        self.progress.terms_in_progress()
222    }
223
224    /// Returns `true` once the stream has reached terminal state.
225    ///
226    /// This flips to `true` after [`next`](Self::next) / [`blocking_next`](Self::blocking_next)
227    /// has observed the end-of-stream (`None`), or after [`cancel`](Self::cancel).
228    /// Buffered but unconsumed channel items do not count as complete.
229    pub fn is_complete(&self) -> bool {
230        self.finished
231    }
232}
233
234impl Drop for UnorderedDownloadStream {
235    fn drop(&mut self) {
236        self.cancel_reconstruction();
237        self.receiver.close();
238    }
239}