Skip to main content

ac_rustube/stream/
callback.rs

1use std::fmt;
2use std::future::Future;
3use std::path::{Path, PathBuf};
4use std::pin::Pin;
5
6use futures::FutureExt;
7use tokio::sync::{mpsc::{Receiver, Sender}, Mutex};
8use tokio::sync::mpsc;
9
10use crate::Result;
11
12pub type OnProgressClosure<'a> = Box<dyn FnMut(CallbackArguments) + Send + 'a>;
13pub type OnProgressAsyncClosure<'a> = Box<dyn FnMut(CallbackArguments) -> Pin<Box<dyn Future<Output=()> + Send + 'a>> + Send + Sync + 'a>;
14pub type OnCompleteClosure<'a> = Box<dyn FnMut(Option<PathBuf>) + Send + 'a>;
15pub type OnCompleteAsyncClosure<'a> = Box<dyn FnMut(Option<PathBuf>) -> Pin<Box<dyn Future<Output=()> + Send + 'a>> + Send + Sync + 'a>;
16
17#[derive(Debug)]
18pub(crate) enum InternalSignal {
19    Value(usize),
20    Finished,
21}
22
23pub(crate) type InternalSender = Sender<InternalSignal>;
24
25/// Arguments given either to a on_progress callback or on_progress receiver
26#[derive(Clone, derivative::Derivative)]
27#[derivative(Debug)]
28pub struct CallbackArguments {
29    pub current_chunk: usize,
30    /// It's more idiomatic to use this content length instead of a prefetched value
31    /// since the content of this field might change in the future during the download.
32    pub content_length: Option<u64>,
33}
34
35/// Type to process on_progress
36pub enum OnProgressType<'a> {
37    /// Box containing a closure to execute on progress
38    Closure(OnProgressClosure<'a>),
39    /// Box containing a async closure to execute on progress
40    AsyncClosure(OnProgressAsyncClosure<'a>),
41    /// Channel to send a message to on progress,
42    /// bool indicates whether or not to cancel on a closed channel
43    Channel(Sender<CallbackArguments>, bool),
44    /// Box containing a closure to execute on progress
45    /// Will get executed for every MB downloaded
46    SlowClosure(OnProgressClosure<'a>),
47    /// Box containing a async closure to execute on progress
48    /// Will get executed for every MB downloaded
49    SlowAsyncClosure(OnProgressAsyncClosure<'a>),
50    /// Channel to send a message to on progress,
51    /// bool indicates whether or not to cancel on a closed channel
52    /// Will get executed for every MB downloaded
53    SlowChannel(Sender<CallbackArguments>, bool),
54    None,
55}
56
57impl<'a> fmt::Debug for OnProgressType<'a> {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        let name = match self {
60            OnProgressType::AsyncClosure(_) => "AsyncClosure(async Fn)",
61            OnProgressType::Channel(_, _) => "Channel(Sender, bool)",
62            OnProgressType::Closure(_) => "Closure(Fn)",
63            OnProgressType::None => "None",
64            OnProgressType::SlowAsyncClosure(_) => "SlowAsyncClosure(async Fn)",
65            OnProgressType::SlowChannel(_, _) => "SlowChannel(Sender, bool)",
66            OnProgressType::SlowClosure(_) => "SlowClosure(Fn)",
67        };
68        f.write_str(name)
69    }
70}
71
72impl<'a> Default for OnProgressType<'a> {
73    fn default() -> Self {
74        OnProgressType::None
75    }
76}
77
78/// Type to process on_progress
79pub enum OnCompleteType<'a> {
80    /// Box containing a closure to execute on complete
81    Closure(OnCompleteClosure<'a>),
82    /// Box containing a async closure to execute on complete
83    AsyncClosure(OnCompleteAsyncClosure<'a>),
84    None,
85}
86
87impl<'a> fmt::Debug for OnCompleteType<'a> {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        let name = match self {
90            OnCompleteType::AsyncClosure(_) => "AsyncClosure(async Fn)",
91            OnCompleteType::Closure(_) => "Closure(Fn)",
92            OnCompleteType::None => "None",
93        };
94        f.write_str(name)
95    }
96}
97
98impl<'a> Default for OnCompleteType<'a> {
99    fn default() -> Self {
100        OnCompleteType::None
101    }
102}
103
104/// Methods and streams to process either on_progress or on_complete
105#[derive(Debug)]
106pub struct Callback<'a> {
107    pub on_progress: OnProgressType<'a>,
108    pub on_complete: OnCompleteType<'a>,
109    pub(crate) internal_sender: InternalSender,
110    pub(crate) internal_receiver: Option<Receiver<InternalSignal>>,
111}
112
113impl<'a> Callback<'a> {
114    /// Create a new callback struct without actual callbacks
115    pub fn new() -> Self {
116        let (tx, rx) = mpsc::channel(100);
117        Callback {
118            on_progress: OnProgressType::None,
119            on_complete: OnCompleteType::None,
120            internal_sender: tx,
121            internal_receiver: Some(rx),
122        }
123    }
124
125    /// Attach a closure to be executed on progress
126    ///
127    /// ### Warning:
128    /// This closure gets executed quite often, once every ~10kB progress.
129    /// If it's too slow, some on_progress events will be dropped.
130    /// If you are looking fore something that will be executed more seldom, look for
131    /// [Callback::connect_on_progress_closure_slow](crate::stream::callback::Callback::connect_on_progress_closure_slow)
132    #[inline]
133    #[must_use]
134    pub fn connect_on_progress_closure(mut self, closure: impl FnMut(CallbackArguments) + Send + 'a) -> Self {
135        self.on_progress = OnProgressType::Closure(Box::new(closure));
136        self
137    }
138
139    /// Attach a closure to be executed on progress. This closure will be executed
140    /// more seldom, around once for every MB downloaded.
141    #[inline]
142    #[must_use]
143    pub fn connect_on_progress_closure_slow(mut self, closure: impl FnMut(CallbackArguments) + Send + 'a) -> Self {
144        self.on_progress = OnProgressType::SlowClosure(Box::new(closure));
145        self
146    }
147
148    /// Attach a async closure to be executed on progress
149    ///
150    /// ### Warning:
151    /// This closure gets executed quite often, once every ~10kB progress.
152    /// If it's too slow, some on_progress events will be dropped.
153    /// If you are looking fore something that will be executed more seldom, look for
154    /// [Callback::connect_on_progress_closure_async_slow](crate::stream::callback::Callback::connect_on_progress_closure_async_slow)
155    #[inline]
156    #[must_use]
157    pub fn connect_on_progress_closure_async<Fut: Future<Output=()> + Send + 'a, F: Fn(CallbackArguments) -> Fut + Send + Sync + 'a>(mut self, closure: F) -> Self {
158        self.on_progress = OnProgressType::AsyncClosure(Box::new(move |arg| closure(arg).boxed()));
159        self
160    }
161
162    /// Attach a async closure to be executed on progress. This closure will be executed
163    /// more seldom, around once for every MB downloaded.
164    #[inline]
165    #[must_use]
166    pub fn connect_on_progress_closure_async_slow<Fut: Future<Output=()> + Send + 'a, F: Fn(CallbackArguments) -> Fut + Send + Sync + 'a>(mut self, closure: F) -> Self {
167        self.on_progress = OnProgressType::SlowAsyncClosure(Box::new(move |arg| closure(arg).boxed()));
168        self
169    }
170
171    /// Attach a bounded sender that receives messages on progress
172    /// cancel_or_close indicates whether or not to cancel the download, if the receiver is closed
173    ///
174    /// ### Warning:
175    /// This sender gets messages quite often, once every ~10kB progress.
176    /// If it's too slow, some on_progress events will be dropped.
177    #[inline]
178    #[must_use]
179    pub fn connect_on_progress_sender(
180        mut self,
181        sender: Sender<CallbackArguments>,
182        cancel_on_close: bool,
183    ) -> Self {
184        self.on_progress = OnProgressType::Channel(sender, cancel_on_close);
185        self
186    }
187
188    /// Attach a bounded sender that receives messages on progress
189    /// cancel_or_close indicates whether or not to cancel the download, if the receiver is closed
190    ///
191    /// This closure will be executed more seldom, around once for every MB downloaded.
192    #[inline]
193    #[must_use]
194    pub fn connect_on_progress_sender_slow(
195        mut self,
196        sender: Sender<CallbackArguments>,
197        cancel_on_close: bool,
198    ) -> Self {
199        self.on_progress = OnProgressType::SlowChannel(sender, cancel_on_close);
200        self
201    }
202
203    /// Attach a closure to be executed on complete
204    #[inline]
205    #[must_use]
206    pub fn connect_on_complete_closure(mut self, closure: impl FnMut(Option<PathBuf>) + Send + 'a) -> Self {
207        self.on_complete = OnCompleteType::Closure(Box::new(closure));
208        self
209    }
210
211    /// Attach a async closure to be executed on complete
212    #[inline]
213    #[must_use]
214    pub fn connect_on_complete_closure_async<Fut: Future<Output=()> + Send + 'a, F: Fn(Option<PathBuf>) -> Fut + Send + Sync + 'a>(mut self, closure: F) -> Self {
215        self.on_complete = OnCompleteType::AsyncClosure(Box::new(move |arg| closure(arg).boxed()));
216        self
217    }
218}
219
220impl<'a> Default for Callback<'a> {
221    fn default() -> Self {
222        Self::new()
223    }
224}
225
226impl super::Stream {
227    /// Attempts to downloads the [`Stream`](super::Stream)s resource.
228    /// This will download the video to <video_id>.mp4 in the current working directory.
229    /// Takes an [`Callback`](crate::stream::callback::Callback)
230    #[inline]
231    pub async fn download_with_callback<'a>(&'a self, callback: Callback<'a>) -> Result<PathBuf> {
232        self.wrap_callback(|channel| {
233            self.internal_download(channel)
234        }, callback).await
235    }
236
237    /// Attempts to downloads the [`Stream`](super::Stream)s resource.
238    /// This will download the video to <video_id>.mp4 in the provided directory.
239    /// Takes an [`Callback`](crate::stream::callback::Callback)
240    #[inline]
241    pub async fn download_to_dir_with_callback<'a, P: AsRef<Path>>(
242        &'a self,
243        dir: P,
244        callback: Callback<'a>,
245    ) -> Result<PathBuf> {
246        self.wrap_callback(|channel| {
247            self.internal_download_to_dir(dir, channel)
248        }, callback).await
249    }
250
251    /// Attempts to downloads the [`Stream`](super::Stream)s resource.
252    /// This will download the video to the provided file path.
253    /// Takes an [`Callback`](crate::stream::callback::Callback)
254    #[inline]
255    pub async fn download_to_with_callback<'a, P: AsRef<Path>>(&'a self, path: P, callback: Callback<'a>) -> Result<()> {
256        let _ = self.wrap_callback(|channel| {
257            self.internal_download_to(path, channel)
258        }, callback).await?;
259        Ok(())
260    }
261
262    async fn wrap_callback<'a, F: Future<Output=Result<PathBuf>>>(
263        &'a self,
264        to_wrap: impl FnOnce(Option<InternalSender>) -> F,
265        mut callback: Callback<'a>,
266    ) -> Result<PathBuf> {
267        let wrap_fut = to_wrap(Some(callback.internal_sender.clone()));
268        let aid_fut = self.on_progress(
269            callback.internal_receiver.take().expect("Callback cannot be used twice"),
270            std::mem::take(&mut callback.on_progress),
271        );
272        let (result, _) = futures::future::join(wrap_fut, aid_fut).await;
273
274        let path = result.as_ref().map(|p| p.clone()).ok();
275
276        Self::on_complete(std::mem::take(&mut callback.on_complete), path).await;
277
278        result
279    }
280
281    #[inline]
282    async fn on_progress<'a>(&'a self, mut receiver: Receiver<InternalSignal>, on_progress: OnProgressType<'a>) {
283        let last_trigger = Mutex::new(0);
284        let content_length = self.content_length().await.ok();
285        match on_progress {
286            OnProgressType::None => {}
287            OnProgressType::Closure(mut closure) => {
288                while let Some(data) = receiver.recv().await {
289                    match data {
290                        InternalSignal::Value(data) => {
291                            let arguments = CallbackArguments {
292                                current_chunk: data,
293                                content_length,
294                            };
295                            closure(arguments);
296                        }
297                        InternalSignal::Finished => break,
298                    }
299                }
300            }
301            OnProgressType::AsyncClosure(mut closure) => {
302                while let Some(data) = receiver.recv().await {
303                    match data {
304                        InternalSignal::Value(data) => {
305                            let arguments = CallbackArguments {
306                                current_chunk: data,
307                                content_length,
308                            };
309                            closure(arguments).await;
310                        }
311                        InternalSignal::Finished => break,
312                    }
313                }
314            }
315            OnProgressType::Channel(sender, cancel_on_close) => {
316                while let Some(data) = receiver.recv().await {
317                    match data {
318                        InternalSignal::Value(data) => {
319                            let arguments = CallbackArguments {
320                                current_chunk: data,
321                                content_length,
322                            };
323                            // await if channel is full
324                            if sender.send(arguments).await.is_err() && cancel_on_close {
325                                receiver.close()
326                            }
327                        }
328                        InternalSignal::Finished => break,
329                    }
330                }
331            }
332            OnProgressType::SlowClosure(mut closure) => {
333                while let Some(data) = receiver.recv().await {
334                    match data {
335                        InternalSignal::Value(data) => {
336                            if let Ok(mut trigger) = last_trigger.try_lock() {
337                                // discard any digits beyond the million digit
338                                let current_million = data / 1_000_000;
339                                if *trigger < current_million {
340                                    *trigger = current_million;
341                                    let arguments = CallbackArguments {
342                                        current_chunk: data,
343                                        content_length,
344                                    };
345                                    closure(arguments)
346                                }
347                            }
348                        }
349                        InternalSignal::Finished => break,
350                    }
351                }
352            }
353            OnProgressType::SlowAsyncClosure(mut closure) => {
354                while let Some(data) = receiver.recv().await {
355                    match data {
356                        InternalSignal::Value(data) => {
357                            if let Ok(mut trigger) = last_trigger.try_lock() {
358                                // discard any digits beyond the million digit
359                                let current_million = data / 1_000_000;
360                                if *trigger < current_million {
361                                    *trigger = current_million;
362                                    let arguments = CallbackArguments {
363                                        current_chunk: data,
364                                        content_length,
365                                    };
366                                    closure(arguments).await
367                                }
368                            }
369                        }
370                        InternalSignal::Finished => break,
371                    }
372                }
373            }
374            OnProgressType::SlowChannel(sender, cancel_on_close) => {
375                while let Some(data) = receiver.recv().await {
376                    match data {
377                        InternalSignal::Value(data) => {
378                            if let Ok(mut trigger) = last_trigger.try_lock() {
379                                // discard any digits beyond the million digit
380                                let current_million = data / 1_000_000;
381                                if *trigger < current_million {
382                                    *trigger = current_million;
383                                    let arguments = CallbackArguments {
384                                        current_chunk: data,
385                                        content_length,
386                                    };
387                                    if sender.send(arguments).await.is_err() && cancel_on_close {
388                                        receiver.close()
389                                    }
390                                }
391                            }
392                        }
393                        InternalSignal::Finished => break,
394                    }
395                }
396            }
397        }
398    }
399
400    #[inline]
401    async fn on_complete(on_complete: OnCompleteType<'_>, path: Option<PathBuf>) {
402        match on_complete {
403            OnCompleteType::None => {}
404            OnCompleteType::Closure(mut closure) => {
405                closure(path)
406            }
407            OnCompleteType::AsyncClosure(mut closure) => {
408                closure(path).await
409            }
410        }
411    }
412}