1use std::io::Write;
2use std::panic::UnwindSafe;
3
4use libc::c_void;
5
6use crate::nxt_unit::{
7 self, nxt_unit_buf_send, nxt_unit_buf_t, nxt_unit_request_info_t, nxt_unit_response_buf_alloc,
8};
9
10use crate::error::{IntoUnitResult, UnitError, UnitResult};
11use crate::request::Request;
12
13pub struct Response<'a> {
21 pub(crate) request: &'a Request<'a>,
22}
23
24impl<'a> Response<'a> {
25 pub fn add_field<N: AsRef<[u8]>, V: AsRef<[u8]>>(&self, name: N, value: V) -> UnitResult<()> {
26 unsafe {
29 nxt_unit::nxt_unit_response_add_field(
30 self.request.nxt_request,
31 name.as_ref().as_ptr() as *const libc::c_char,
32 name.as_ref().len() as u8,
33 value.as_ref().as_ptr() as *const libc::c_char,
34 value.as_ref().len() as u32,
35 )
36 .into_unit_result()
37 }
38 }
39
40 pub fn add_content<C: AsRef<[u8]>>(&self, content: C) -> UnitResult<()> {
41 unsafe {
44 nxt_unit::nxt_unit_response_add_content(
45 self.request.nxt_request,
46 content.as_ref().as_ptr() as *const c_void,
47 content.as_ref().len() as u32,
48 )
49 .into_unit_result()
50 }
51 }
52
53 pub fn realloc(&self, max_fields_count: usize, max_fields_size: usize) -> UnitResult<()> {
54 unsafe {
57 nxt_unit::nxt_unit_response_realloc(
58 self.request.nxt_request,
59 max_fields_count as u32,
60 max_fields_size as u32,
61 )
62 .into_unit_result()
63 }
64 }
65
66 pub fn send(&self) -> UnitResult<()> {
67 unsafe { nxt_unit::nxt_unit_response_send(self.request.nxt_request).into_unit_result() }
70 }
71}
72
73pub struct BodyWriter<'a> {
85 _lifetime: std::marker::PhantomData<&'a mut ()>,
86 nxt_request: *mut nxt_unit_request_info_t,
87 response_buffer: *mut nxt_unit_buf_t,
88 chunk_cursor: *mut u8,
89 chunk_size: usize,
90 bytes_remaining: usize,
91}
92
93impl UnwindSafe for BodyWriter<'_> {}
94
95impl<'a> BodyWriter<'a> {
96 pub(crate) fn new(request: &'a Request<'a>, chunk_size: usize) -> std::io::Result<Self> {
97 let mut writer = BodyWriter {
98 _lifetime: Default::default(),
99 nxt_request: request.nxt_request,
100 response_buffer: std::ptr::null_mut(),
101 chunk_cursor: std::ptr::null_mut(),
102 chunk_size,
103 bytes_remaining: 0,
104 };
105 writer.allocate_buffer()?;
106 Ok(writer)
107 }
108
109 fn allocate_buffer(&mut self) -> std::io::Result<()> {
110 unsafe {
111 let buf = nxt_unit_response_buf_alloc(self.nxt_request, self.chunk_size as u32);
112
113 if buf.is_null() {
114 return Err(std::io::Error::new(
115 std::io::ErrorKind::Other,
116 "Could not allocate response buffer in Unit's shared memory",
117 ));
118 }
119
120 self.response_buffer = buf;
121 self.chunk_cursor = (*buf).start as *mut u8;
122 self.bytes_remaining = self.chunk_size;
123 }
124
125 Ok(())
126 }
127
128 pub fn copy_from_reader<R: std::io::Read>(&mut self, mut r: R) -> std::io::Result<()> {
138 loop {
139 if self.bytes_remaining == 0 {
140 self.flush()?;
141 self.allocate_buffer()?;
142 }
143
144 let write_buffer = unsafe {
148 libc::memset(self.chunk_cursor as *mut c_void, 0, self.bytes_remaining);
149 std::slice::from_raw_parts_mut(self.chunk_cursor, self.bytes_remaining)
150 };
151
152 let bytes = r.read(write_buffer)?;
153
154 self.chunk_cursor = unsafe { self.chunk_cursor.add(bytes) };
155 self.bytes_remaining -= bytes;
156
157 if bytes == 0 {
158 break;
159 }
160 }
161
162 return Ok(());
163 }
164}
165
166impl std::io::Write for BodyWriter<'_> {
167 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
168 let buf = if buf.len() >= self.bytes_remaining && !buf.is_empty() {
169 if self.bytes_remaining == 0 {
170 self.flush()?;
171 self.allocate_buffer()?;
172 }
173
174 &buf[..buf.len().min(self.bytes_remaining)]
175 } else {
176 buf
177 };
178
179 unsafe {
183 std::ptr::copy_nonoverlapping(buf.as_ptr(), self.chunk_cursor, buf.len());
184 self.chunk_cursor = self.chunk_cursor.add(buf.len());
185 }
186 self.bytes_remaining -= buf.len();
187
188 Ok(buf.len())
189 }
190
191 fn flush(&mut self) -> std::io::Result<()> {
192 if self.response_buffer.is_null() || self.bytes_remaining == self.chunk_size {
193 return Ok(());
194 }
195
196 unsafe {
197 (*self.response_buffer).free = (*self.response_buffer)
198 .start
199 .add(self.chunk_size - self.bytes_remaining);
200 nxt_unit_buf_send(self.response_buffer)
201 .into_unit_result()
202 .map_err(|UnitError(_)| {
203 std::io::Error::new(
204 std::io::ErrorKind::Other,
205 "Could not send response buffer to Unit server",
206 )
207 })?;
208 }
209
210 self.response_buffer = std::ptr::null_mut();
211 self.bytes_remaining = 0;
212
213 Ok(())
214 }
215}
216
217impl Drop for BodyWriter<'_> {
218 fn drop(&mut self) {
219 if !self.chunk_cursor.is_null() {
220 if std::thread::panicking() {
221 unsafe {
222 nxt_unit::nxt_unit_buf_free(self.response_buffer);
223 }
224 } else {
225 if let Err(err) = self.flush() {
226 if !std::thread::panicking() {
229 panic!("Error while dropping ResponseWriter: {}", err);
230 }
231 }
232 }
233 }
234 }
235}