xet_data/file_reconstruction/data_writer/unordered_download_stream.rs
1use std::sync::Arc;
2use std::sync::atomic::Ordering;
3
4use bytes::Bytes;
5use tokio::sync::Notify;
6use tokio::sync::mpsc::UnboundedReceiver;
7use tracing::info;
8
9use super::super::error::Result;
10use super::super::file_reconstructor::FileReconstructor;
11use super::super::run_state::RunState;
12use super::unordered_writer::{CompletedTerm, UnorderedWriterProgress};
13
14/// A streaming download handle that yields data chunks in completion order,
15/// each tagged with its byte offset in the output file.
16///
17/// Created by [`FileReconstructor::reconstruct_to_unordered_stream`]. The
18/// reconstruction task is spawned immediately but pauses until
19/// [`start`](Self::start) is called (or the first [`next`](Self::next) /
20/// [`blocking_next`](Self::blocking_next)). Because the `tokio::spawn`
21/// happens at construction time, subsequent calls to `start()`, `next()`,
22/// and `blocking_next()` do **not** require a tokio runtime context.
23///
24/// Unlike [`DownloadStream`](super::download_stream::DownloadStream), data
25/// chunks may arrive out of order. Each chunk is returned as `(offset, Bytes)`
26/// so the consumer knows where it belongs. Progress can be monitored via
27/// the tracking methods which read shared atomic counters.
28///
29/// Holds only `Arc<WriterProgress>`, not the writer itself, so the channel
30/// sender is dropped naturally when the reconstruction task finishes.
31pub struct UnorderedDownloadStream {
32 /// Shared atomic progress counters (also held by the writer and its tasks).
33 progress: Arc<UnorderedWriterProgress>,
34
35 /// Channel receiver for completed terms from spawned tasks.
36 receiver: UnboundedReceiver<Result<CompletedTerm>>,
37
38 /// Whether the stream has finished (no more data).
39 finished: bool,
40
41 /// Shared run state with the `FileReconstructor`.
42 run_state: Arc<RunState>,
43
44 /// Signal to unblock the spawned reconstruction task. `Some` means
45 /// `start()` has not yet been called; the spawned task is waiting.
46 start_signal: Option<Arc<Notify>>,
47}
48
49impl UnorderedDownloadStream {
50 /// Creates a new `UnorderedDownloadStream`, immediately spawning the
51 /// reconstruction task on the current tokio runtime. The task blocks
52 /// on an internal [`Notify`] until [`start`](Self::start) is called.
53 ///
54 /// # Panics
55 ///
56 /// Panics if called outside a tokio runtime context.
57 pub(crate) fn new(reconstructor: FileReconstructor, run_state: Arc<RunState>) -> Self {
58 use super::unordered_writer::UnorderedWriter;
59
60 let (writer, receiver, progress) = UnorderedWriter::new_streaming(run_state.clone());
61 let start_signal = Arc::new(Notify::new());
62
63 let signal = start_signal.clone();
64 let rs = run_state.clone();
65 tokio::spawn(async move {
66 signal.notified().await;
67 info!(file_hash = %rs.file_hash(), "Starting unordered download stream");
68 let _ = reconstructor.run(writer, rs, true).await;
69 });
70
71 Self {
72 progress,
73 receiver,
74 finished: false,
75 run_state,
76 start_signal: Some(start_signal),
77 }
78 }
79
80 pub(crate) fn abort_callback(&self) -> Box<dyn Fn() + Send + Sync> {
81 let run_state = self.run_state.clone();
82 let start_signal = self.start_signal.clone();
83 Box::new(move || {
84 run_state.cancel();
85 if let Some(signal) = start_signal.as_ref() {
86 signal.notify_one();
87 }
88 })
89 }
90
91 /// Unblocks the reconstruction task so it begins producing data.
92 ///
93 /// If already started, this is a no-op. Called automatically on the first
94 /// [`next`](Self::next) / [`blocking_next`](Self::blocking_next).
95 ///
96 /// This method is non-async and does not require a tokio runtime context.
97 pub fn start(&mut self) {
98 if let Some(signal) = self.start_signal.take() {
99 signal.notify_one();
100 }
101 }
102
103 fn ensure_started(&mut self) {
104 if self.start_signal.is_some() {
105 self.start();
106 }
107 }
108
109 fn cancel_reconstruction(&self) {
110 self.run_state.cancel();
111 if let Some(signal) = self.start_signal.as_ref() {
112 signal.notify_one();
113 }
114 }
115
116 /// Returns the next chunk of downloaded data with its byte offset,
117 /// blocking the current thread until data is available.
118 ///
119 /// Returns `Ok(None)` when the download is complete.
120 ///
121 /// # Panics
122 ///
123 /// Panics if called from within an async runtime context. Use from a
124 /// regular thread or from [`tokio::task::spawn_blocking`] instead.
125 /// For the async-safe variant, use [`next`](Self::next).
126 pub fn blocking_next(&mut self) -> Result<Option<(u64, Bytes)>> {
127 if self.finished {
128 return Ok(None);
129 }
130 self.ensure_started();
131
132 match self.receiver.blocking_recv() {
133 Some(result) => self.process_term(result),
134 None => {
135 self.finished = true;
136 self.run_state.check_error()?;
137 Ok(None)
138 },
139 }
140 }
141
142 /// Returns the next chunk of downloaded data with its byte offset
143 /// asynchronously.
144 ///
145 /// Returns `Ok(None)` when the download is complete.
146 pub async fn next(&mut self) -> Result<Option<(u64, Bytes)>> {
147 if self.finished {
148 return Ok(None);
149 }
150 self.ensure_started();
151
152 if let Ok(result) = self.receiver.try_recv() {
153 return self.process_term(result);
154 }
155
156 let next_item = tokio::select! {
157 biased;
158 recv = self.receiver.recv() => recv,
159 _ = self.run_state.cancelled() => None,
160 };
161
162 match next_item {
163 Some(result) => self.process_term(result),
164 None => {
165 self.finished = true;
166 self.run_state.check_error()?;
167 Ok(None)
168 },
169 }
170 }
171
172 fn process_term(&mut self, result: Result<CompletedTerm>) -> Result<Option<(u64, Bytes)>> {
173 let term = result?;
174 self.run_state.report_bytes_written(term.data.len() as u64);
175 let offset = term.byte_range.start;
176 let data = term.data;
177 drop(term.permit);
178 Ok(Some((offset, data)))
179 }
180
181 /// Cancels the in-progress (or not-yet-started) download.
182 ///
183 /// Signals the shared run state so the reconstruction loop aborts at its
184 /// next check point. After calling this, subsequent calls to
185 /// [`blocking_next`](Self::blocking_next) / [`next`](Self::next) will
186 /// return `Ok(None)`.
187 pub fn cancel(&mut self) {
188 self.cancel_reconstruction();
189 let _ = self.start_signal.take();
190 self.receiver.close();
191 self.finished = true;
192 }
193
194 // ── Tracking methods ─────────────────────────────────────────────────
195
196 /// Total bytes expected for the reconstruction, read from the progress
197 /// updater. Returns 0 if not yet known or no progress updater is set.
198 pub fn total_bytes_expected(&self) -> u64 {
199 self.run_state
200 .progress_updater()
201 .map(|u| u.item().total_bytes.load(Ordering::Acquire))
202 .unwrap_or(0)
203 }
204
205 /// Bytes currently being fetched by in-progress tasks.
206 pub fn bytes_in_progress(&self) -> u64 {
207 self.progress.bytes_in_progress()
208 }
209
210 /// Bytes that have been delivered through the progress updater.
211 /// Returns 0 if no progress updater is set.
212 pub fn bytes_completed(&self) -> u64 {
213 self.run_state
214 .progress_updater()
215 .map(|u| u.total_bytes_completed())
216 .unwrap_or(0)
217 }
218
219 /// Number of tasks currently resolving data futures.
220 pub fn terms_in_progress(&self) -> u64 {
221 self.progress.terms_in_progress()
222 }
223
224 /// Returns `true` once the stream has reached terminal state.
225 ///
226 /// This flips to `true` after [`next`](Self::next) / [`blocking_next`](Self::blocking_next)
227 /// has observed the end-of-stream (`None`), or after [`cancel`](Self::cancel).
228 /// Buffered but unconsumed channel items do not count as complete.
229 pub fn is_complete(&self) -> bool {
230 self.finished
231 }
232}
233
234impl Drop for UnorderedDownloadStream {
235 fn drop(&mut self) {
236 self.cancel_reconstruction();
237 self.receiver.close();
238 }
239}