unit_rs/
response.rs

1use std::io::Write;
2use std::panic::UnwindSafe;
3
4use libc::c_void;
5
6use crate::nxt_unit::{
7    self, nxt_unit_buf_send, nxt_unit_buf_t, nxt_unit_request_info_t, nxt_unit_response_buf_alloc,
8};
9
10use crate::error::{IntoUnitResult, UnitError, UnitResult};
11use crate::request::Request;
12
13/// A buffer for constructing an initial response.
14///
15/// This object is created by calling the
16/// [`create_response()`](Request::create_response) method on a [`Request`].
17///
18/// Dropping this object will _not_ send the response; it must be manually sent
19/// with the [`Response::send()`] method.
20pub struct Response<'a> {
21    pub(crate) request: &'a Request<'a>,
22}
23
24impl<'a> Response<'a> {
25    pub fn add_field<N: AsRef<[u8]>, V: AsRef<[u8]>>(&self, name: N, value: V) -> UnitResult<()> {
26        // SAFETY: Unit's C API does state and buffer size checks internally.
27        // This structure is not Send nor Sync, so sharing it is fine.
28        unsafe {
29            nxt_unit::nxt_unit_response_add_field(
30                self.request.nxt_request,
31                name.as_ref().as_ptr() as *const libc::c_char,
32                name.as_ref().len() as u8,
33                value.as_ref().as_ptr() as *const libc::c_char,
34                value.as_ref().len() as u32,
35            )
36            .into_unit_result()
37        }
38    }
39
40    pub fn add_content<C: AsRef<[u8]>>(&self, content: C) -> UnitResult<()> {
41        // SAFETY: Unit's C API does state and buffer size checks internally.
42        // This structure is not Send nor Sync, so sharing it is fine.
43        unsafe {
44            nxt_unit::nxt_unit_response_add_content(
45                self.request.nxt_request,
46                content.as_ref().as_ptr() as *const c_void,
47                content.as_ref().len() as u32,
48            )
49            .into_unit_result()
50        }
51    }
52
53    pub fn realloc(&self, max_fields_count: usize, max_fields_size: usize) -> UnitResult<()> {
54        // SAFETY: Unit's C API does state and buffer size checks internally.
55        // This structure is not Send nor Sync, so sharing it is fine.
56        unsafe {
57            nxt_unit::nxt_unit_response_realloc(
58                self.request.nxt_request,
59                max_fields_count as u32,
60                max_fields_size as u32,
61            )
62            .into_unit_result()
63        }
64    }
65
66    pub fn send(&self) -> UnitResult<()> {
67        // SAFETY: Unit's C API does state and buffer size checks internally.
68        // This structure is not Send nor Sync, so sharing it is fine.
69        unsafe { nxt_unit::nxt_unit_response_send(self.request.nxt_request).into_unit_result() }
70    }
71}
72
73/// A writer that writes to a Unit shared memory response buffer.
74///
75/// This object is created using [`Request::write_chunks()`] or
76/// [`Request::send_chunks_with_writer()`].
77///
78/// A chunk will be immediately sent to the client once the writer's memory
79/// buffer reaches `chunk_size`, or [`flush()`](std::io::Write::flush) is
80/// called on the writer, and a new shared memory buffer will be allocated.
81///
82/// The writer will also flush when dropped, but any errors that happen during
83/// a drop will panic.
84pub struct BodyWriter<'a> {
85    _lifetime: std::marker::PhantomData<&'a mut ()>,
86    nxt_request: *mut nxt_unit_request_info_t,
87    response_buffer: *mut nxt_unit_buf_t,
88    chunk_cursor: *mut u8,
89    chunk_size: usize,
90    bytes_remaining: usize,
91}
92
93impl UnwindSafe for BodyWriter<'_> {}
94
95impl<'a> BodyWriter<'a> {
96    pub(crate) fn new(request: &'a Request<'a>, chunk_size: usize) -> std::io::Result<Self> {
97        let mut writer = BodyWriter {
98            _lifetime: Default::default(),
99            nxt_request: request.nxt_request,
100            response_buffer: std::ptr::null_mut(),
101            chunk_cursor: std::ptr::null_mut(),
102            chunk_size,
103            bytes_remaining: 0,
104        };
105        writer.allocate_buffer()?;
106        Ok(writer)
107    }
108
109    fn allocate_buffer(&mut self) -> std::io::Result<()> {
110        unsafe {
111            let buf = nxt_unit_response_buf_alloc(self.nxt_request, self.chunk_size as u32);
112
113            if buf.is_null() {
114                return Err(std::io::Error::new(
115                    std::io::ErrorKind::Other,
116                    "Could not allocate response buffer in Unit's shared memory",
117                ));
118            }
119
120            self.response_buffer = buf;
121            self.chunk_cursor = (*buf).start as *mut u8;
122            self.bytes_remaining = self.chunk_size;
123        }
124
125        Ok(())
126    }
127
128    /// Copy from a reader to this writer without using an intermediary buffer.
129    ///
130    /// Normally the [`Write`](std::io::Write) trait receives an input buffer to
131    /// copy from, and the `ResponseWriter` writer will copy from it into Unit's
132    /// shared memory.
133    ///
134    /// This method will instead give Unit's shared memory buffer directly to
135    /// the [`Read`](std::io::Read) trait in order to skip copying to a third
136    /// temporary buffer (such as when using [`std::io::copy`]).
137    pub fn copy_from_reader<R: std::io::Read>(&mut self, mut r: R) -> std::io::Result<()> {
138        loop {
139            if self.bytes_remaining == 0 {
140                self.flush()?;
141                self.allocate_buffer()?;
142            }
143
144            // SAFETY: Allocated by Unit and fully initialized with memset.
145            // TODO: The memset is unnecessary, use std::io::ReadBuf once that
146            // is stabilized.
147            let write_buffer = unsafe {
148                libc::memset(self.chunk_cursor as *mut c_void, 0, self.bytes_remaining);
149                std::slice::from_raw_parts_mut(self.chunk_cursor, self.bytes_remaining)
150            };
151
152            let bytes = r.read(write_buffer)?;
153
154            self.chunk_cursor = unsafe { self.chunk_cursor.add(bytes) };
155            self.bytes_remaining -= bytes;
156
157            if bytes == 0 {
158                break;
159            }
160        }
161
162        return Ok(());
163    }
164}
165
166impl std::io::Write for BodyWriter<'_> {
167    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
168        let buf = if buf.len() >= self.bytes_remaining && !buf.is_empty() {
169            if self.bytes_remaining == 0 {
170                self.flush()?;
171                self.allocate_buffer()?;
172            }
173
174            &buf[..buf.len().min(self.bytes_remaining)]
175        } else {
176            buf
177        };
178
179        // SAFETY: The target region is not initialized and never made
180        // available to the user until it is, so it cannot overlap.
181        // The buffer length is truncated above to fit the target's limit.
182        unsafe {
183            std::ptr::copy_nonoverlapping(buf.as_ptr(), self.chunk_cursor, buf.len());
184            self.chunk_cursor = self.chunk_cursor.add(buf.len());
185        }
186        self.bytes_remaining -= buf.len();
187
188        Ok(buf.len())
189    }
190
191    fn flush(&mut self) -> std::io::Result<()> {
192        if self.response_buffer.is_null() || self.bytes_remaining == self.chunk_size {
193            return Ok(());
194        }
195
196        unsafe {
197            (*self.response_buffer).free = (*self.response_buffer)
198                .start
199                .add(self.chunk_size - self.bytes_remaining);
200            nxt_unit_buf_send(self.response_buffer)
201                .into_unit_result()
202                .map_err(|UnitError(_)| {
203                    std::io::Error::new(
204                        std::io::ErrorKind::Other,
205                        "Could not send response buffer to Unit server",
206                    )
207                })?;
208        }
209
210        self.response_buffer = std::ptr::null_mut();
211        self.bytes_remaining = 0;
212
213        Ok(())
214    }
215}
216
217impl Drop for BodyWriter<'_> {
218    fn drop(&mut self) {
219        if !self.chunk_cursor.is_null() {
220            if std::thread::panicking() {
221                unsafe {
222                    nxt_unit::nxt_unit_buf_free(self.response_buffer);
223                }
224            } else {
225                if let Err(err) = self.flush() {
226                    // Prevent a double-panic, which causes an abort and hides
227                    // details on the initial panic.
228                    if !std::thread::panicking() {
229                        panic!("Error while dropping ResponseWriter: {}", err);
230                    }
231                }
232            }
233        }
234    }
235}