1use std::fmt;
2use std::future::Future;
3use std::path::{Path, PathBuf};
4use std::pin::Pin;
5
6use futures::FutureExt;
7use tokio::sync::{mpsc::{Receiver, Sender}, Mutex};
8use tokio::sync::mpsc;
9
10use crate::Result;
11
12pub type OnProgressClosure<'a> = Box<dyn FnMut(CallbackArguments) + Send + 'a>;
13pub type OnProgressAsyncClosure<'a> = Box<dyn FnMut(CallbackArguments) -> Pin<Box<dyn Future<Output=()> + Send + 'a>> + Send + Sync + 'a>;
14pub type OnCompleteClosure<'a> = Box<dyn FnMut(Option<PathBuf>) + Send + 'a>;
15pub type OnCompleteAsyncClosure<'a> = Box<dyn FnMut(Option<PathBuf>) -> Pin<Box<dyn Future<Output=()> + Send + 'a>> + Send + Sync + 'a>;
16
17#[derive(Debug)]
18pub(crate) enum InternalSignal {
19 Value(usize),
20 Finished,
21}
22
23pub(crate) type InternalSender = Sender<InternalSignal>;
24
25#[derive(Clone, derivative::Derivative)]
27#[derivative(Debug)]
28pub struct CallbackArguments {
29 pub current_chunk: usize,
30 pub content_length: Option<u64>,
33}
34
35pub enum OnProgressType<'a> {
37 Closure(OnProgressClosure<'a>),
39 AsyncClosure(OnProgressAsyncClosure<'a>),
41 Channel(Sender<CallbackArguments>, bool),
44 SlowClosure(OnProgressClosure<'a>),
47 SlowAsyncClosure(OnProgressAsyncClosure<'a>),
50 SlowChannel(Sender<CallbackArguments>, bool),
54 None,
55}
56
57impl<'a> fmt::Debug for OnProgressType<'a> {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 let name = match self {
60 OnProgressType::AsyncClosure(_) => "AsyncClosure(async Fn)",
61 OnProgressType::Channel(_, _) => "Channel(Sender, bool)",
62 OnProgressType::Closure(_) => "Closure(Fn)",
63 OnProgressType::None => "None",
64 OnProgressType::SlowAsyncClosure(_) => "SlowAsyncClosure(async Fn)",
65 OnProgressType::SlowChannel(_, _) => "SlowChannel(Sender, bool)",
66 OnProgressType::SlowClosure(_) => "SlowClosure(Fn)",
67 };
68 f.write_str(name)
69 }
70}
71
72impl<'a> Default for OnProgressType<'a> {
73 fn default() -> Self {
74 OnProgressType::None
75 }
76}
77
78pub enum OnCompleteType<'a> {
80 Closure(OnCompleteClosure<'a>),
82 AsyncClosure(OnCompleteAsyncClosure<'a>),
84 None,
85}
86
87impl<'a> fmt::Debug for OnCompleteType<'a> {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 let name = match self {
90 OnCompleteType::AsyncClosure(_) => "AsyncClosure(async Fn)",
91 OnCompleteType::Closure(_) => "Closure(Fn)",
92 OnCompleteType::None => "None",
93 };
94 f.write_str(name)
95 }
96}
97
98impl<'a> Default for OnCompleteType<'a> {
99 fn default() -> Self {
100 OnCompleteType::None
101 }
102}
103
104#[derive(Debug)]
106pub struct Callback<'a> {
107 pub on_progress: OnProgressType<'a>,
108 pub on_complete: OnCompleteType<'a>,
109 pub(crate) internal_sender: InternalSender,
110 pub(crate) internal_receiver: Option<Receiver<InternalSignal>>,
111}
112
113impl<'a> Callback<'a> {
114 pub fn new() -> Self {
116 let (tx, rx) = mpsc::channel(100);
117 Callback {
118 on_progress: OnProgressType::None,
119 on_complete: OnCompleteType::None,
120 internal_sender: tx,
121 internal_receiver: Some(rx),
122 }
123 }
124
125 #[inline]
133 #[must_use]
134 pub fn connect_on_progress_closure(mut self, closure: impl FnMut(CallbackArguments) + Send + 'a) -> Self {
135 self.on_progress = OnProgressType::Closure(Box::new(closure));
136 self
137 }
138
139 #[inline]
142 #[must_use]
143 pub fn connect_on_progress_closure_slow(mut self, closure: impl FnMut(CallbackArguments) + Send + 'a) -> Self {
144 self.on_progress = OnProgressType::SlowClosure(Box::new(closure));
145 self
146 }
147
148 #[inline]
156 #[must_use]
157 pub fn connect_on_progress_closure_async<Fut: Future<Output=()> + Send + 'a, F: Fn(CallbackArguments) -> Fut + Send + Sync + 'a>(mut self, closure: F) -> Self {
158 self.on_progress = OnProgressType::AsyncClosure(Box::new(move |arg| closure(arg).boxed()));
159 self
160 }
161
162 #[inline]
165 #[must_use]
166 pub fn connect_on_progress_closure_async_slow<Fut: Future<Output=()> + Send + 'a, F: Fn(CallbackArguments) -> Fut + Send + Sync + 'a>(mut self, closure: F) -> Self {
167 self.on_progress = OnProgressType::SlowAsyncClosure(Box::new(move |arg| closure(arg).boxed()));
168 self
169 }
170
171 #[inline]
178 #[must_use]
179 pub fn connect_on_progress_sender(
180 mut self,
181 sender: Sender<CallbackArguments>,
182 cancel_on_close: bool,
183 ) -> Self {
184 self.on_progress = OnProgressType::Channel(sender, cancel_on_close);
185 self
186 }
187
188 #[inline]
193 #[must_use]
194 pub fn connect_on_progress_sender_slow(
195 mut self,
196 sender: Sender<CallbackArguments>,
197 cancel_on_close: bool,
198 ) -> Self {
199 self.on_progress = OnProgressType::SlowChannel(sender, cancel_on_close);
200 self
201 }
202
203 #[inline]
205 #[must_use]
206 pub fn connect_on_complete_closure(mut self, closure: impl FnMut(Option<PathBuf>) + Send + 'a) -> Self {
207 self.on_complete = OnCompleteType::Closure(Box::new(closure));
208 self
209 }
210
211 #[inline]
213 #[must_use]
214 pub fn connect_on_complete_closure_async<Fut: Future<Output=()> + Send + 'a, F: Fn(Option<PathBuf>) -> Fut + Send + Sync + 'a>(mut self, closure: F) -> Self {
215 self.on_complete = OnCompleteType::AsyncClosure(Box::new(move |arg| closure(arg).boxed()));
216 self
217 }
218}
219
220impl<'a> Default for Callback<'a> {
221 fn default() -> Self {
222 Self::new()
223 }
224}
225
226impl super::Stream {
227 #[inline]
231 pub async fn download_with_callback<'a>(&'a self, callback: Callback<'a>) -> Result<PathBuf> {
232 self.wrap_callback(|channel| {
233 self.internal_download(channel)
234 }, callback).await
235 }
236
237 #[inline]
241 pub async fn download_to_dir_with_callback<'a, P: AsRef<Path>>(
242 &'a self,
243 dir: P,
244 callback: Callback<'a>,
245 ) -> Result<PathBuf> {
246 self.wrap_callback(|channel| {
247 self.internal_download_to_dir(dir, channel)
248 }, callback).await
249 }
250
251 #[inline]
255 pub async fn download_to_with_callback<'a, P: AsRef<Path>>(&'a self, path: P, callback: Callback<'a>) -> Result<()> {
256 let _ = self.wrap_callback(|channel| {
257 self.internal_download_to(path, channel)
258 }, callback).await?;
259 Ok(())
260 }
261
262 async fn wrap_callback<'a, F: Future<Output=Result<PathBuf>>>(
263 &'a self,
264 to_wrap: impl FnOnce(Option<InternalSender>) -> F,
265 mut callback: Callback<'a>,
266 ) -> Result<PathBuf> {
267 let wrap_fut = to_wrap(Some(callback.internal_sender.clone()));
268 let aid_fut = self.on_progress(
269 callback.internal_receiver.take().expect("Callback cannot be used twice"),
270 std::mem::take(&mut callback.on_progress),
271 );
272 let (result, _) = futures::future::join(wrap_fut, aid_fut).await;
273
274 let path = result.as_ref().map(|p| p.clone()).ok();
275
276 Self::on_complete(std::mem::take(&mut callback.on_complete), path).await;
277
278 result
279 }
280
281 #[inline]
282 async fn on_progress<'a>(&'a self, mut receiver: Receiver<InternalSignal>, on_progress: OnProgressType<'a>) {
283 let last_trigger = Mutex::new(0);
284 let content_length = self.content_length().await.ok();
285 match on_progress {
286 OnProgressType::None => {}
287 OnProgressType::Closure(mut closure) => {
288 while let Some(data) = receiver.recv().await {
289 match data {
290 InternalSignal::Value(data) => {
291 let arguments = CallbackArguments {
292 current_chunk: data,
293 content_length,
294 };
295 closure(arguments);
296 }
297 InternalSignal::Finished => break,
298 }
299 }
300 }
301 OnProgressType::AsyncClosure(mut closure) => {
302 while let Some(data) = receiver.recv().await {
303 match data {
304 InternalSignal::Value(data) => {
305 let arguments = CallbackArguments {
306 current_chunk: data,
307 content_length,
308 };
309 closure(arguments).await;
310 }
311 InternalSignal::Finished => break,
312 }
313 }
314 }
315 OnProgressType::Channel(sender, cancel_on_close) => {
316 while let Some(data) = receiver.recv().await {
317 match data {
318 InternalSignal::Value(data) => {
319 let arguments = CallbackArguments {
320 current_chunk: data,
321 content_length,
322 };
323 if sender.send(arguments).await.is_err() && cancel_on_close {
325 receiver.close()
326 }
327 }
328 InternalSignal::Finished => break,
329 }
330 }
331 }
332 OnProgressType::SlowClosure(mut closure) => {
333 while let Some(data) = receiver.recv().await {
334 match data {
335 InternalSignal::Value(data) => {
336 if let Ok(mut trigger) = last_trigger.try_lock() {
337 let current_million = data / 1_000_000;
339 if *trigger < current_million {
340 *trigger = current_million;
341 let arguments = CallbackArguments {
342 current_chunk: data,
343 content_length,
344 };
345 closure(arguments)
346 }
347 }
348 }
349 InternalSignal::Finished => break,
350 }
351 }
352 }
353 OnProgressType::SlowAsyncClosure(mut closure) => {
354 while let Some(data) = receiver.recv().await {
355 match data {
356 InternalSignal::Value(data) => {
357 if let Ok(mut trigger) = last_trigger.try_lock() {
358 let current_million = data / 1_000_000;
360 if *trigger < current_million {
361 *trigger = current_million;
362 let arguments = CallbackArguments {
363 current_chunk: data,
364 content_length,
365 };
366 closure(arguments).await
367 }
368 }
369 }
370 InternalSignal::Finished => break,
371 }
372 }
373 }
374 OnProgressType::SlowChannel(sender, cancel_on_close) => {
375 while let Some(data) = receiver.recv().await {
376 match data {
377 InternalSignal::Value(data) => {
378 if let Ok(mut trigger) = last_trigger.try_lock() {
379 let current_million = data / 1_000_000;
381 if *trigger < current_million {
382 *trigger = current_million;
383 let arguments = CallbackArguments {
384 current_chunk: data,
385 content_length,
386 };
387 if sender.send(arguments).await.is_err() && cancel_on_close {
388 receiver.close()
389 }
390 }
391 }
392 }
393 InternalSignal::Finished => break,
394 }
395 }
396 }
397 }
398 }
399
400 #[inline]
401 async fn on_complete(on_complete: OnCompleteType<'_>, path: Option<PathBuf>) {
402 match on_complete {
403 OnCompleteType::None => {}
404 OnCompleteType::Closure(mut closure) => {
405 closure(path)
406 }
407 OnCompleteType::AsyncClosure(mut closure) => {
408 closure(path).await
409 }
410 }
411 }
412}