xet_data/file_reconstruction/data_writer/download_stream.rs
1use std::sync::Arc;
2
3use bytes::Bytes;
4use tokio::sync::Notify;
5use tokio::sync::mpsc::UnboundedReceiver;
6use tracing::info;
7
8use super::super::error::{FileReconstructionError, Result};
9use super::super::file_reconstructor::FileReconstructor;
10use super::super::run_state::RunState;
11use super::sequential_writer::{SequentialRetrievalItem, SequentialWriter};
12
13/// A streaming download handle that yields data chunks as they are reconstructed.
14///
15/// Created by [`FileReconstructor::reconstruct_to_stream`]. The reconstruction
16/// task is spawned immediately but pauses until [`start`](Self::start) is
17/// called (or the first [`next`](Self::next) / [`blocking_next`](Self::blocking_next)).
18/// Because the `tokio::spawn` happens at construction time, subsequent calls to
19/// `start()`, `next()`, and `blocking_next()` do **not** require a tokio runtime
20/// context.
21///
22/// Data is delivered by pulling items directly from the sequential writer's
23/// internal queue, bypassing the synchronous writer thread entirely. Each call
24/// to [`blocking_next`](Self::blocking_next) / [`next`](Self::next) returns the
25/// next sequential chunk, or `None` when the download is complete. Any
26/// reconstruction error is surfaced on the call that would have returned the
27/// next chunk (or on the final `None` boundary) via the shared run state.
28pub struct DownloadStream {
29 /// Channel receiver for sequential retrieval items from the writer queue.
30 receiver: UnboundedReceiver<SequentialRetrievalItem>,
31 /// Whether the stream has finished (no more data).
32 finished: bool,
33 /// Shared run state with the `FileReconstructor`. When cancelled,
34 /// the reconstruction loop aborts promptly at its next check point or
35 /// `select!` branch. Also used for progress reporting and error propagation.
36 run_state: Arc<RunState>,
37 /// Signal to unblock the spawned reconstruction task. `Some` means
38 /// `start()` has not yet been called; the spawned task is waiting.
39 start_signal: Option<Arc<Notify>>,
40}
41
42impl DownloadStream {
43 /// Creates a new `DownloadStream`, immediately spawning the reconstruction
44 /// task on the current tokio runtime. The task blocks on an internal
45 /// [`Notify`] until [`start`](Self::start) is called.
46 ///
47 /// # Panics
48 ///
49 /// Panics if called outside a tokio runtime context.
50 pub(crate) fn new(reconstructor: FileReconstructor, run_state: Arc<RunState>) -> Self {
51 let (data_writer, receiver) = SequentialWriter::new_streaming(run_state.clone());
52 let start_signal = Arc::new(Notify::new());
53
54 let signal = start_signal.clone();
55 let rs = run_state.clone();
56 tokio::spawn(async move {
57 signal.notified().await;
58 info!(file_hash = %rs.file_hash(), "Starting download stream");
59 let _ = reconstructor.run(data_writer, rs, true).await;
60 });
61
62 Self {
63 receiver,
64 finished: false,
65 run_state,
66 start_signal: Some(start_signal),
67 }
68 }
69
70 pub(crate) fn abort_callback(&self) -> Box<dyn Fn() + Send + Sync> {
71 let run_state = self.run_state.clone();
72 let start_signal = self.start_signal.clone();
73 Box::new(move || {
74 run_state.cancel();
75 if let Some(signal) = start_signal.as_ref() {
76 signal.notify_one();
77 }
78 })
79 }
80
81 /// Unblocks the reconstruction task so it begins producing data.
82 ///
83 /// If already started, this is a no-op. Called automatically on the first
84 /// [`next`](Self::next) / [`blocking_next`](Self::blocking_next).
85 ///
86 /// This method is non-async and does not require a tokio runtime context.
87 pub fn start(&mut self) {
88 if let Some(signal) = self.start_signal.take() {
89 signal.notify_one();
90 }
91 }
92
93 fn ensure_started(&mut self) {
94 if self.start_signal.is_some() {
95 self.start();
96 }
97 }
98
99 fn cancel_reconstruction(&self) {
100 self.run_state.cancel();
101 if let Some(signal) = self.start_signal.as_ref() {
102 signal.notify_one();
103 }
104 }
105
106 /// Returns the next chunk of downloaded data, blocking the current thread
107 /// until data is available.
108 ///
109 /// Returns `Ok(None)` when the download is complete.
110 ///
111 /// # Panics
112 ///
113 /// Panics if called from within an async runtime context (e.g. inside a
114 /// `tokio::spawn` or `async fn`). Use from a regular thread or from
115 /// [`tokio::task::spawn_blocking`] instead. For the async-safe variant,
116 /// use [`next`](Self::next).
117 pub fn blocking_next(&mut self) -> Result<Option<Bytes>> {
118 if self.finished {
119 return Ok(None);
120 }
121 self.ensure_started();
122
123 match self.receiver.blocking_recv() {
124 Some(SequentialRetrievalItem::Data { receiver, permit }) => {
125 let data = match receiver.blocking_recv() {
126 Ok(data) => data,
127 Err(_) => {
128 self.run_state.check_error()?;
129 return Err(FileReconstructionError::InternalWriterError(
130 "Data sender was dropped before sending data.".to_string(),
131 ));
132 },
133 };
134 self.run_state.report_bytes_written(data.len() as u64);
135 drop(permit);
136 Ok(Some(data))
137 },
138 Some(SequentialRetrievalItem::Finish) | None => {
139 self.finished = true;
140 self.run_state.check_error()?;
141 Ok(None)
142 },
143 }
144 }
145
146 /// Returns the next chunk of downloaded data asynchronously.
147 ///
148 /// Returns `Ok(None)` when the download is complete or cancelled.
149 pub async fn next(&mut self) -> Result<Option<Bytes>> {
150 if self.finished {
151 return Ok(None);
152 }
153 self.ensure_started();
154
155 let item = if let Ok(item) = self.receiver.try_recv() {
156 Some(item)
157 } else {
158 tokio::select! {
159 biased;
160 recv = self.receiver.recv() => recv,
161 _ = self.run_state.cancelled() => None,
162 }
163 };
164
165 match item {
166 Some(SequentialRetrievalItem::Data { receiver, permit }) => {
167 let data = match receiver.await {
168 Ok(data) => data,
169 Err(_) => {
170 self.run_state.check_error()?;
171 return Err(FileReconstructionError::InternalWriterError(
172 "Data sender was dropped before sending data.".to_string(),
173 ));
174 },
175 };
176 self.run_state.report_bytes_written(data.len() as u64);
177 drop(permit);
178 Ok(Some(data))
179 },
180 Some(SequentialRetrievalItem::Finish) | None => {
181 self.finished = true;
182 self.run_state.check_error()?;
183 Ok(None)
184 },
185 }
186 }
187
188 /// Cancels the in-progress (or not-yet-started) download.
189 ///
190 /// Signals the shared run state so the reconstruction loop aborts at its
191 /// next check point or `select!` branch, and closes the channel receiver.
192 /// After calling this, subsequent calls to [`blocking_next`](Self::blocking_next)
193 /// / [`next`](Self::next) will return `Ok(None)`.
194 pub fn cancel(&mut self) {
195 self.cancel_reconstruction();
196 let _ = self.start_signal.take();
197 self.receiver.close();
198 self.finished = true;
199 }
200}
201
202impl Drop for DownloadStream {
203 fn drop(&mut self) {
204 self.cancel_reconstruction();
205 self.receiver.close();
206 }
207}