1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
mod bindings;

pub use bindings::{
    delete_object, get_download_url, get_storage, ref_, upload_bytes, Ref, Storage, UploadTask,
    UploadTaskSnapshot,
};
use futures::Stream;
use std::{
    cell::RefCell,
    pin::Pin,
    rc::Rc,
    task::{Context, Poll, Waker},
};
use wasm_bindgen::prelude::*;

impl UploadTask {
    pub fn async_iter(&self) -> UploadTaskAsyncIter {
        let waker: Rc<RefCell<Option<Waker>>> = Rc::default();
        let completed: Rc<RefCell<bool>> = Rc::default();
        let snapshot: Rc<RefCell<Option<UploadTaskSnapshot>>> = Rc::default();
        let err: Rc<RefCell<Option<JsValue>>> = Rc::default();

        let on_snapshot = Closure::new(clone!([snapshot, waker], move |js_snapshot| {
            trace!("UploadTask snapshot:");
            console_log!(&js_snapshot);

            *snapshot.borrow_mut() = Some(js_snapshot);

            if let Some(w) = waker.borrow().as_ref() {
                w.wake_by_ref();
            }
        }));
        let on_err = Closure::new(clone!([completed, err, waker], move |js_err| {
            error!("UploadTask error:");
            console_log!(&js_err);

            *err.borrow_mut() = Some(js_err);

            // Complete the stream since we errored
            *completed.borrow_mut() = true;

            if let Some(w) = waker.borrow().as_ref() {
                w.wake_by_ref()
            }
        }));
        let on_complete = Closure::new(clone!([completed, waker], move || {
            trace!("UploadTask completed");

            *completed.borrow_mut() = true;

            // Notify waker
            let waker_borrow = waker.borrow();

            if let Some(w) = waker_borrow.as_ref() {
                w.wake_by_ref();
            }
        }));

        let unsub = self.on(
            "state_changed",
            &on_snapshot,
            Some(&on_err),
            Some(&on_complete),
        );

        UploadTaskAsyncIter {
            _on_snapshot: on_snapshot,
            _on_err: on_err,
            _on_complete: on_complete,
            snapshot,
            err,
            completed,
            waker,
            unsub,
        }
    }
}

pub struct UploadTaskAsyncIter {
    _on_snapshot: Closure<dyn FnMut(UploadTaskSnapshot)>,
    _on_err: Closure<dyn FnMut(JsValue)>,
    _on_complete: Closure<dyn FnMut()>,
    snapshot: Rc<RefCell<Option<UploadTaskSnapshot>>>,
    err: Rc<RefCell<Option<JsValue>>>,
    completed: Rc<RefCell<bool>>,
    waker: Rc<RefCell<Option<Waker>>>,
    unsub: js_sys::Function,
}

impl Drop for UploadTaskAsyncIter {
    fn drop(&mut self) {
        self.unsub.call0(&JsValue::UNDEFINED).unwrap();
    }
}

impl Stream for UploadTaskAsyncIter {
    type Item = Result<UploadTaskSnapshot, JsValue>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        trace!("Polling UploadTaskAsyncIter");

        // Update waker
        *self.waker.borrow_mut() = Some(cx.waker().to_owned());

        if *self.completed.borrow() {
            if let Some(err) = self.err.borrow_mut().take() {
                trace!("UploadTaskAsyncIter errored, returning `Poll::Ready(Some(err))`");

                Poll::Ready(Some(Err(err)))
            } else {
                trace!("UploadTaskAsyncIter completed, returning `Poll::Ready(None)`");

                Poll::Ready(None)
            }
        } else if let Some(snapshot) = self.snapshot.borrow_mut().take() {
            trace!(
                "UploadTaskAsyncIter yielded snapshot, returning \
                 `Poll::Ready(Some(snapshot))`"
            );

            Poll::Ready(Some(Ok(snapshot)))
        } else {
            trace!("UploadTaskAsyncIter pending, returning `Poll::Pending`");

            Poll::Pending
        }
    }
}