1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
//! Bindings and conversions for
//! [writable streams](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream).
use std::marker::PhantomData;

use futures::Sink;
use wasm_bindgen::prelude::*;

pub use into_sink::IntoSink;
use into_underlying_sink::IntoUnderlyingSink;

use crate::util::promise_to_void_future;

mod into_sink;
mod into_underlying_sink;
pub mod sys;

/// A [`WritableStream`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream).
///
/// `WritableStream`s can be created from a [raw JavaScript stream](sys::WritableStream) with
/// [`from_raw`](Self::from_raw), or from a Rust [`Sink`](Sink)
/// with [`from_sink`](Self::from_sink).
///
/// They can be converted into a [raw JavaScript stream](sys::WritableStream) with
/// [`into_raw`](Self::into_raw), or into a Rust [`Sink`](Sink)
/// with [`into_sink`](Self::into_sink).
#[derive(Debug)]
pub struct WritableStream {
    raw: sys::WritableStream,
}

impl WritableStream {
    /// Creates a new `WritableStream` from a [JavaScript stream](sys::WritableStream).
    #[inline]
    pub fn from_raw(raw: sys::WritableStream) -> Self {
        Self { raw }
    }

    /// Creates a new `WritableStream` from a [`Sink`](Sink).
    ///
    /// Items and errors must be represented as raw [`JsValue`](JsValue)s.
    /// Use [`with`](futures::SinkExt::with) and/or [`sink_map_err`](futures::SinkExt::sink_map_err)
    /// to convert a sink's items to a `JsValue` before passing it to this function.
    pub fn from_sink<Si>(sink: Si) -> Self
    where
        Si: Sink<JsValue, Error = JsValue> + 'static,
    {
        let sink = IntoUnderlyingSink::new(Box::new(sink));
        // Use the default queuing strategy (with a HWM of 1 chunk).
        // We shouldn't set HWM to 0, since that would break piping to the writable stream.
        let raw = sys::WritableStream::new_with_sink(sink);
        WritableStream { raw }
    }

    /// Acquires a reference to the underlying [JavaScript stream](sys::WritableStream).
    #[inline]
    pub fn as_raw(&self) -> &sys::WritableStream {
        &self.raw
    }

    /// Consumes this `WritableStream`, returning the underlying [JavaScript stream](sys::WritableStream).
    #[inline]
    pub fn into_raw(self) -> sys::WritableStream {
        self.raw
    }

    /// Returns `true` if the stream is [locked to a writer](https://streams.spec.whatwg.org/#lock).
    #[inline]
    pub fn is_locked(&self) -> bool {
        self.as_raw().is_locked()
    }

    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream,
    /// signaling that the producer can no longer successfully write to the stream
    /// and it is to be immediately moved to an errored state, with any queued-up writes discarded.
    ///
    /// If the stream is currently locked to a writer, then this returns an error.
    pub async fn abort(&mut self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().abort()).await
    }

    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream with the
    /// given `reason`, signaling that the producer can no longer successfully write to the stream
    /// and it is to be immediately moved to an errored state, with any queued-up writes discarded.
    ///
    /// If the stream is currently locked to a writer, then this returns an error.
    pub async fn abort_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().abort_with_reason(reason)).await
    }

    /// Creates a [writer](WritableStreamDefaultWriter) and
    /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new writer.
    ///
    /// While the stream is locked, no other writer can be acquired until this one is released.
    ///
    /// **Panics** if the stream is already locked to a writer. For a non-panicking variant,
    /// use [`try_get_writer`](Self::try_get_writer).
    #[inline]
    pub fn get_writer(&mut self) -> WritableStreamDefaultWriter {
        self.try_get_writer()
            .expect_throw("already locked to a writer")
    }

    /// Try to create a [writer](WritableStreamDefaultWriter) and
    /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new writer.
    ///
    /// While the stream is locked, no other writer can be acquired until this one is released.
    ///
    /// If the stream is already locked to a writer, then this returns an error.
    pub fn try_get_writer(&mut self) -> Result<WritableStreamDefaultWriter, js_sys::Error> {
        Ok(WritableStreamDefaultWriter {
            raw: self.as_raw().get_writer()?,
            _stream: PhantomData,
        })
    }

    /// Converts this `WritableStream` into a [`Sink`](Sink).
    ///
    /// Items and errors are represented by their raw [`JsValue`](JsValue).
    /// Use [`with`](futures::SinkExt::with) and/or [`sink_map_err`](futures::SinkExt::sink_map_err)
    /// on the returned stream to convert them to a more appropriate type.
    ///
    /// **Panics** if the stream is already locked to a writer. For a non-panicking variant,
    /// use [`try_into_sink`](Self::try_into_sink).
    #[inline]
    pub fn into_sink(self) -> IntoSink<'static> {
        self.try_into_sink()
            .expect_throw("already locked to a writer")
    }

    /// Try to convert this `WritableStream` into a [`Sink`](Sink).
    ///
    /// Items and errors are represented by their raw [`JsValue`](JsValue).
    /// Use [`with`](futures::SinkExt::with) and/or [`sink_map_err`](futures::SinkExt::sink_map_err)
    /// on the returned stream to convert them to a more appropriate type.
    ///
    /// If the stream is already locked to a writer, then this returns an error
    /// along with the original `WritableStream`.
    pub fn try_into_sink(self) -> Result<IntoSink<'static>, (js_sys::Error, Self)> {
        let raw_writer = match self.as_raw().get_writer() {
            Ok(raw_writer) => raw_writer,
            Err(err) => return Err((err, self)),
        };
        let writer = WritableStreamDefaultWriter {
            raw: raw_writer,
            _stream: PhantomData,
        };
        Ok(writer.into_sink())
    }
}

impl<Si> From<Si> for WritableStream
where
    Si: Sink<JsValue, Error = JsValue> + 'static,
{
    /// Equivalent to [`from_sink`](Self::from_sink).
    #[inline]
    fn from(sink: Si) -> Self {
        Self::from_sink(sink)
    }
}

/// A [`WritableStreamDefaultWriter`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStreamDefaultWriter)
/// that can be used to write chunks to a [`WritableStream`](WritableStream).
///
/// This is returned by the [`get_writer`](WritableStream::get_writer) method.
///
/// When the writer is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
#[derive(Debug)]
pub struct WritableStreamDefaultWriter<'stream> {
    raw: sys::WritableStreamDefaultWriter,
    _stream: PhantomData<&'stream mut WritableStream>,
}

impl<'stream> WritableStreamDefaultWriter<'stream> {
    /// Acquires a reference to the underlying [JavaScript writer](sys::WritableStreamDefaultWriter).
    #[inline]
    pub fn as_raw(&self) -> &sys::WritableStreamDefaultWriter {
        &self.raw
    }

    /// Waits for the stream to become closed.
    ///
    /// This returns an error if the stream ever errors, or if the writer's lock is
    /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
    /// closing.
    pub async fn closed(&self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().closed()).await
    }

    /// Returns the desired size to fill the stream's internal queue.
    ///
    /// * It can be negative, if the queue is over-full.
    ///   A producer can use this information to determine the right amount of data to write.
    /// * It will be `None` if the stream cannot be successfully written to
    ///   (due to either being errored, or having an abort queued up).
    /// * It will return zero if the stream is closed.
    #[inline]
    pub fn desired_size(&self) -> Option<f64> {
        self.as_raw().desired_size()
    }

    /// Waits until the desired size to fill the stream's internal queue transitions
    /// from non-positive to positive, signaling that it is no longer applying backpressure.
    ///
    /// Once the desired size to fill the stream's internal queue dips back to zero or below,
    /// this will return a new future that stays pending until the next transition.
    ///
    /// This returns an error if the stream ever errors, or if the writer's lock is
    /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
    /// closing.
    pub async fn ready(&self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().ready()).await
    }

    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream,
    /// signaling that the producer can no longer successfully write to the stream.
    ///
    /// Equivalent to [`WritableStream.abort`](WritableStream::abort).
    pub async fn abort(&mut self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().abort()).await
    }

    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream with the
    /// given `reason`, signaling that the producer can no longer successfully write to the stream.
    ///
    /// Equivalent to [`WritableStream.abort_with_reason`](WritableStream::abort_with_reason).
    pub async fn abort_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().abort_with_reason(reason)).await
    }

    /// Writes the given `chunk` to the writable stream, by waiting until any previous writes
    /// have finished successfully, and then sending the chunk to the underlying sink's `write()`
    /// method.
    ///
    /// This returns `Ok(())` upon a successful write, or `Err(error)` if the write fails or stream
    /// becomes errored before the writing process is initiated.
    ///
    /// Note that what "success" means is up to the underlying sink; it might indicate simply
    /// that the chunk has been accepted, and not necessarily that it is safely saved to
    /// its ultimate destination.
    pub async fn write(&mut self, chunk: JsValue) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().write(chunk)).await
    }

    /// Closes the stream.
    ///
    /// The underlying sink will finish processing any previously-written chunks, before invoking
    /// its close behavior. During this time any further attempts to write will fail
    /// (without erroring the stream).
    ///
    /// This returns `Ok(())` if all remaining chunks are successfully written and the stream
    /// successfully closes, or `Err(error)` if an error is encountered during this process.
    pub async fn close(&mut self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().close()).await
    }

    /// Converts this `WritableStreamDefaultWriter` into a [`Sink`](Sink).
    ///
    /// This is similar to [`WritableStream.into_sink`](WritableStream::into_sink),
    /// except that after the returned `Sink` is dropped, the original `WritableStream` is still
    /// usable. This allows writing only a few chunks through the `Sink`, while still allowing
    /// another writer to write more chunks later on.
    #[inline]
    pub fn into_sink(self) -> IntoSink<'stream> {
        IntoSink::new(self)
    }
}

impl Drop for WritableStreamDefaultWriter<'_> {
    fn drop(&mut self) {
        self.as_raw().release_lock()
    }
}