1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
//! APIs for manipulating data streams.
//!
use super::error::{Error, StreamError};
use num_traits::FromPrimitive;
#[link(wasm_import_module = "stream")]
extern "C" {
    fn host_write_chunk(
        sd: u32,
        chunk_addr: *const u8,
        chunk_len: u32,
        err_code_ptr: *mut i32,
    ) -> i32;

    fn host_read_chunk(sd: u32, err_ptr: *mut i32) -> i32;

    fn host_pipe_to(readsd: u32, writesd: u32, err_ptr: *mut i32) -> i32;

    fn host_abort_stream(sd: u32, err_ptr: *mut i32) -> i32;

    fn host_close_stream(sd: u32, err_ptr: *mut i32) -> i32;
}

/// The WriteStream is used to stream data.
#[derive(Debug)]
pub struct WriteStream {
    sd: u32,
}

/// A base stream trait that defines a common stream interface.
pub trait BaseStream {
    #[doc(hidden)]
    fn new(sd: u32) -> Self;
    /// Get the stream descriptor of this stream
    fn sd(&self) -> u32;
    /// Cleanly close the stream
    fn close(&self) -> Result<(), StreamError> {
        let mut error_code = 0;
        let res;
        unsafe {
            res = host_close_stream(self.sd(), &mut error_code);
        };

        if res < 0 {
            return Err(StreamError::from(Error::from_i32(error_code).unwrap()));
        }

        Ok(())
    }
}

impl WriteStream {
    /// Sends the given [`&str`] chunk into the stream
    pub fn write_chunk_text(&mut self, text: &str) -> Result<(), StreamError> {
        self.call_host_write_chunk(Vec::from(text))
    }

    /// Sends the given [`Vec<u8>`] chunk into the steam
    pub fn write_chunk_binary(&mut self, bytes: Vec<u8>) -> Result<(), StreamError> {
        self.call_host_write_chunk(bytes)
    }

    fn call_host_write_chunk(&mut self, bytes: Vec<u8>) -> Result<(), StreamError> {
        // &mut self is intentional here -- writing a chunk modifies the stream
        let chunk_slice = bytes.as_slice();
        let chunk_len = chunk_slice.len() as u32;

        let mut error_code: i32 = 0;

        let res;
        unsafe {
            res = host_write_chunk(self.sd, chunk_slice.as_ptr(), chunk_len, &mut error_code);
        };

        if res < 0 {
            return Err(StreamError::from(Error::from_i32(error_code).unwrap()));
        }

        Ok(())
    }

    /// Aborts sending data and closes the stream.
    ///
    /// This method abruptly ends the streaming.
    ///
    /// Difference between [`WriteStream::close()`] and [`WriteStream::abort()`] when HTTP/1.1 is used for streaming:
    ///
    /// - [`WriteStream::close()`] sends a terminating chunk of size zero to signalize the end of the stream
    /// - [`WriteStream::abort()`] stops streaming without sending the terminating chunk
    ///
    /// Note that [`WriteStream::close()`] is not available after [`WriteStream::abort()`] is called.
    pub fn abort(&self) -> Result<(), StreamError> {
        let mut error_code = 0;
        let res;
        unsafe {
            res = host_abort_stream(self.sd(), &mut error_code);
        };

        if res < 0 {
            return Err(StreamError::from(Error::from_i32(error_code).unwrap()));
        }

        Ok(())
    }
}

impl BaseStream for WriteStream {
    fn new(stream_desc: u32) -> Self {
        WriteStream { sd: stream_desc }
    }

    fn sd(&self) -> u32 {
        self.sd
    }
}

/// ReadStream is used to receive streamed data.
#[derive(Debug)]
pub struct ReadStream {
    sd: u32,
}

impl ReadStream {
    /// Reads a chunk of binary data from the stream.
    pub fn read_chunk(&mut self) -> Option<Result<Vec<u8>, StreamError>> {
        // &mut self is intentional here -- reading a chunk
        // changes the state of the stream
        let mut error_code: i32 = 0;
        let response = unsafe { host_read_chunk(self.sd, &mut error_code) };

        if response > 0 {
            Some(Ok(super::result::get_result_bytes(response).unwrap()))
        } else if response == 0 {
            //EOF
            None
        } else {
            return Some(Err(StreamError::from(Error::from_i32(error_code).unwrap())));
        }
    }

    /// Pipes a read stream into a write stream.
    /// After all data is transmitted, both streams are closed.
    pub fn pipe_to(&mut self, ws: &mut WriteStream) -> Result<(), StreamError> {
        // &mut self is intentional here -- reading a chunk
        // changes the state of the stream
        let mut error_code: i32 = 0;
        let response = unsafe { host_pipe_to(self.sd, ws.sd, &mut error_code) };

        if response >= 0 {
            Ok(())
        } else {
            return Err(StreamError::from(Error::from_i32(error_code).unwrap()));
        }
    }

    /// Reads and returns all data from the stream until the end of stream.
    ///
    /// This method does not close the stream.
    pub fn read_all(&mut self) -> Result<Vec<u8>, StreamError> {
        let mut all_bytes = Vec::new();

        while let Some(bytes_res) = self.read_chunk() {
            match bytes_res {
                Ok(mut bytes) => all_bytes.append(&mut bytes),
                Err(err) => {
                    return Err(err);
                }
            }
        }

        return Ok(all_bytes);
    }
}

impl BaseStream for ReadStream {
    fn new(stream_desc: u32) -> Self {
        ReadStream { sd: stream_desc }
    }

    fn sd(&self) -> u32 {
        self.sd
    }
}