1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
mod bindings;

pub use bindings::{
    delete_object, get_download_url, get_storage, ref_, upload_bytes, FullMetadata, Ref,
    SettableMetadata, Storage, UploadMetadata, UploadMetadataOptions, UploadTask,
    UploadTaskSnapshot,
};
use futures::Stream;
use std::{
    cell::RefCell,
    pin::Pin,
    rc::Rc,
    task::{Context, Poll, Waker},
};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;

impl UploadTask {
    pub fn async_iter(&self) -> UploadTaskAsyncIter {
        let waker: Rc<RefCell<Option<Waker>>> = Rc::default();
        let completed: Rc<RefCell<bool>> = Rc::default();
        let snapshot: Rc<RefCell<Option<UploadTaskSnapshot>>> = Rc::default();
        let err: Rc<RefCell<Option<JsValue>>> = Rc::default();

        let on_snapshot = Closure::new(clone!([snapshot, waker], move |js_snapshot| {
            *snapshot.borrow_mut() = Some(js_snapshot);

            if let Some(w) = waker.borrow().as_ref() {
                w.wake_by_ref();
            }
        }));
        let on_err = Closure::new(clone!([completed, err, waker], move |js_err| {
            *err.borrow_mut() = Some(js_err);

            // Complete the stream since we errored
            *completed.borrow_mut() = true;

            if let Some(w) = waker.borrow().as_ref() {
                w.wake_by_ref()
            }
        }));
        let on_complete = Closure::new(clone!([completed, waker], move || {
            *completed.borrow_mut() = true;

            // Notify waker
            let waker_borrow = waker.borrow();

            if let Some(w) = waker_borrow.as_ref() {
                w.wake_by_ref();
            }
        }));

        let unsub = self.on(
            "state_changed",
            &on_snapshot,
            Some(&on_err),
            Some(&on_complete),
        );

        UploadTaskAsyncIter {
            _on_snapshot: on_snapshot,
            _on_err: on_err,
            _on_complete: on_complete,
            snapshot,
            err,
            completed,
            waker,
            unsub,
        }
    }
}

pub struct UploadTaskAsyncIter {
    _on_snapshot: Closure<dyn FnMut(UploadTaskSnapshot)>,
    _on_err: Closure<dyn FnMut(JsValue)>,
    _on_complete: Closure<dyn FnMut()>,
    snapshot: Rc<RefCell<Option<UploadTaskSnapshot>>>,
    err: Rc<RefCell<Option<JsValue>>>,
    completed: Rc<RefCell<bool>>,
    waker: Rc<RefCell<Option<Waker>>>,
    unsub: js_sys::Function,
}

impl Drop for UploadTaskAsyncIter {
    fn drop(&mut self) {
        self.unsub.call0(&JsValue::UNDEFINED).unwrap();
    }
}

impl Stream for UploadTaskAsyncIter {
    type Item = Result<UploadTaskSnapshot, JsValue>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // Update waker
        *self.waker.borrow_mut() = Some(cx.waker().to_owned());

        if *self.completed.borrow() {
            if let Some(err) = self.err.borrow_mut().take() {
                Poll::Ready(Some(Err(err)))
            } else {
                Poll::Ready(None)
            }
        } else if let Some(snapshot) = self.snapshot.borrow_mut().take() {
            Poll::Ready(Some(Ok(snapshot)))
        } else {
            Poll::Pending
        }
    }
}

pub async fn get_metadata(ref_: Ref) -> Result<FullMetadata, JsValue> {
    bindings::get_metadata(ref_)
        .await
        .map(|m| m.unchecked_into())
}