1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
pub(crate) mod handle;

use self::handle::StreamingBodyHandle;
use super::Body;
use crate::{
    convert::{Borrowable, ToHeaderName, ToHeaderValue},
    experimental::StreamingBodyExt,
};
use http::header::HeaderMap;
use std::io::{BufWriter, Write};

/// A streaming HTTP body that can be written to, or appended to from another body.
///
/// The interface to this type is very similar to `Body`, however it is write-only, and can only be
/// created as a result of calling
/// [`Response::stream_to_client()`][`crate::Response::stream_to_client()`] or
/// [`Request::send_async_streaming()`][`crate::Request::send_async_streaming()`].
///
/// The most efficient way to write the body is through the [`Write`] implementation. Writes are
/// buffered, and automatically flushed, but you can call [`Write::flush()`] to explicitly flush the
/// buffer and cause a new chunk to be written to the client.
///
/// A streaming body handle will be automatically aborted if it goes out of scope without calling
/// [`finish()`][`Self::finish()`].
#[must_use = "streaming bodies must be `.finish()`ed"]
pub struct StreamingBody {
    writer: BufWriter<StreamingBodyHandle>,
}

impl StreamingBody {
    /// Finish writing to a streaming body handle.
    pub fn finish(self) -> std::io::Result<()> {
        self.writer
            .into_inner()?
            .finish()
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
    }

    // this is not exported, since misuse can lead to data getting dropped or appearing out of order
    fn handle(&mut self) -> &mut StreamingBodyHandle {
        self.writer.get_mut()
    }

    /// Append a body onto the end of this streaming body.
    ///
    #[doc = include_str!("../../../docs/snippets/body-append-constant-time.md")]
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # use fastly::{Body, Response};
    /// # let beresp = Response::new();
    /// # let other_body = Body::new();
    /// let mut streaming_body = beresp.stream_to_client();
    /// streaming_body.append(other_body);
    /// ```
    pub fn append(&mut self, other: Body) {
        // flush the write buffer of the destination body, so that we can use the append method on
        // the underlying handles
        self.writer.flush().expect("fastly_http_body::write failed");
        self.handle().append(other.into_handle())
    }

    /// Write a slice of bytes to the end of this streaming body, and return the number of bytes written.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # let resp = fastly::Response::new();
    /// let mut streaming_body = resp.stream_to_client();
    /// # #[allow(deprecated)]
    /// streaming_body.write_bytes(&[0, 1, 2, 3]);
    /// ```
    #[deprecated(since = "0.10.0", note = "use std::io::Write::write() instead")]
    pub fn write_bytes(&mut self, bytes: &[u8]) -> usize {
        self.writer
            .write(bytes)
            .expect("fastly_http_body::write failed")
    }

    /// Write a string slice to the end of this streaming body, and return the number of bytes
    /// written.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # let resp = fastly::Response::new();
    /// let mut streaming_body = resp.stream_to_client();
    /// # #[allow(deprecated)]
    /// streaming_body.write_str("woof woof");
    /// ```
    #[deprecated(since = "0.10.0", note = "use std::io::Write::write() instead")]
    pub fn write_str(&mut self, string: &str) -> usize {
        #[allow(deprecated)]
        self.write_bytes(string.as_ref())
    }
}

impl From<StreamingBodyHandle> for StreamingBody {
    fn from(handle: StreamingBodyHandle) -> Self {
        Self {
            writer: BufWriter::new(handle),
        }
    }
}

// This trait implementation is much simpler than those of `Body`, since we don't have to manage
// multiple buffers. It's just a passthrough to the methods defined on `BufWriter`.
impl Write for StreamingBody {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.writer.write(buf)
    }

    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
        self.writer.write_vectored(bufs)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        self.writer.flush()
    }
}

impl StreamingBodyExt for StreamingBody {
    fn append_trailer(&mut self, name: impl ToHeaderName, value: impl ToHeaderValue) {
        self.handle().append_trailer(
            name.into_borrowable().as_ref(),
            value.into_borrowable().as_ref(),
        );
    }

    fn finish_with_trailers(mut self, trailers: &HeaderMap) -> Result<(), std::io::Error> {
        self.writer.get_mut().set_trailers(trailers);
        self.writer
            .into_inner()?
            .finish()
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
    }
}