armature_core/
vectored_io.rs

1//! Vectored I/O for HTTP Responses
2//!
3//! This module provides vectored write support using `writev()` to send
4//! multiple buffers (headers + body) in a single syscall, reducing overhead.
5//!
6//! ## Performance Benefits
7//!
8//! Traditional approach:
9//! ```text
10//! write(headers)  → syscall + context switch
11//! write(body)     → syscall + context switch
12//! ```
13//!
14//! Vectored I/O:
15//! ```text
16//! writev([headers, body]) → single syscall
17//! ```
18//!
19//! This can provide +2-3% throughput improvement by:
20//! - Reducing syscall overhead
21//! - Better TCP segment utilization
22//! - Avoiding Nagle's algorithm delays
23//!
24//! ## Usage
25//!
26//! ```rust,ignore
27//! use armature_core::vectored_io::{VectoredResponse, ResponseChunks};
28//!
29//! // Build response with separate header and body buffers
30//! let response = VectoredResponse::new(200)
31//!     .header("Content-Type", "application/json")
32//!     .body(json_bytes);
33//!
34//! // Get IoSlices for writev
35//! let chunks = response.into_io_slices();
36//! ```
37
38use bytes::{BufMut, Bytes, BytesMut};
39use std::collections::HashMap;
40use std::io::IoSlice;
41
42/// Maximum number of IoSlice buffers for vectored writes.
43/// Most responses have: status line + headers + body = 3 parts.
44/// We allow up to 16 for chunked responses with multiple parts.
45pub const MAX_IO_SLICES: usize = 16;
46
47/// Pre-computed common HTTP status lines (avoids formatting on hot path)
48static STATUS_200: &[u8] = b"HTTP/1.1 200 OK\r\n";
49static STATUS_201: &[u8] = b"HTTP/1.1 201 Created\r\n";
50static STATUS_204: &[u8] = b"HTTP/1.1 204 No Content\r\n";
51static STATUS_301: &[u8] = b"HTTP/1.1 301 Moved Permanently\r\n";
52static STATUS_302: &[u8] = b"HTTP/1.1 302 Found\r\n";
53static STATUS_304: &[u8] = b"HTTP/1.1 304 Not Modified\r\n";
54static STATUS_400: &[u8] = b"HTTP/1.1 400 Bad Request\r\n";
55static STATUS_401: &[u8] = b"HTTP/1.1 401 Unauthorized\r\n";
56static STATUS_403: &[u8] = b"HTTP/1.1 403 Forbidden\r\n";
57static STATUS_404: &[u8] = b"HTTP/1.1 404 Not Found\r\n";
58static STATUS_405: &[u8] = b"HTTP/1.1 405 Method Not Allowed\r\n";
59static STATUS_500: &[u8] = b"HTTP/1.1 500 Internal Server Error\r\n";
60static STATUS_502: &[u8] = b"HTTP/1.1 502 Bad Gateway\r\n";
61static STATUS_503: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n";
62
63/// Header separator
64static HEADER_SEP: &[u8] = b": ";
65/// Line ending
66static CRLF: &[u8] = b"\r\n";
67
68/// Get pre-computed status line for common status codes.
69#[inline]
70pub fn status_line(status: u16) -> &'static [u8] {
71    match status {
72        200 => STATUS_200,
73        201 => STATUS_201,
74        204 => STATUS_204,
75        301 => STATUS_301,
76        302 => STATUS_302,
77        304 => STATUS_304,
78        400 => STATUS_400,
79        401 => STATUS_401,
80        403 => STATUS_403,
81        404 => STATUS_404,
82        405 => STATUS_405,
83        500 => STATUS_500,
84        502 => STATUS_502,
85        503 => STATUS_503,
86        _ => STATUS_200, // Fallback, caller should use format_status_line
87    }
88}
89
90/// Check if status code has a pre-computed status line.
91#[inline]
92pub fn has_precomputed_status(status: u16) -> bool {
93    matches!(
94        status,
95        200 | 201 | 204 | 301 | 302 | 304 | 400 | 401 | 403 | 404 | 405 | 500 | 502 | 503
96    )
97}
98
99/// Format a status line for non-common status codes.
100#[inline]
101pub fn format_status_line(status: u16, buf: &mut BytesMut) {
102    buf.extend_from_slice(b"HTTP/1.1 ");
103    // Write status code directly
104    let mut n = status;
105    let d2 = (n % 10) as u8 + b'0';
106    n /= 10;
107    let d1 = (n % 10) as u8 + b'0';
108    n /= 10;
109    let d0 = n as u8 + b'0';
110    buf.put_u8(d0);
111    buf.put_u8(d1);
112    buf.put_u8(d2);
113    buf.extend_from_slice(b" ");
114    buf.extend_from_slice(status_reason(status).as_bytes());
115    buf.extend_from_slice(CRLF);
116}
117
118/// Get reason phrase for status code.
119#[inline]
120fn status_reason(status: u16) -> &'static str {
121    match status {
122        100 => "Continue",
123        101 => "Switching Protocols",
124        200 => "OK",
125        201 => "Created",
126        202 => "Accepted",
127        204 => "No Content",
128        206 => "Partial Content",
129        301 => "Moved Permanently",
130        302 => "Found",
131        303 => "See Other",
132        304 => "Not Modified",
133        307 => "Temporary Redirect",
134        308 => "Permanent Redirect",
135        400 => "Bad Request",
136        401 => "Unauthorized",
137        403 => "Forbidden",
138        404 => "Not Found",
139        405 => "Method Not Allowed",
140        408 => "Request Timeout",
141        409 => "Conflict",
142        410 => "Gone",
143        413 => "Payload Too Large",
144        415 => "Unsupported Media Type",
145        422 => "Unprocessable Entity",
146        429 => "Too Many Requests",
147        500 => "Internal Server Error",
148        501 => "Not Implemented",
149        502 => "Bad Gateway",
150        503 => "Service Unavailable",
151        504 => "Gateway Timeout",
152        _ => "Unknown",
153    }
154}
155
156/// Response chunks for vectored I/O.
157///
158/// This holds all the buffers that make up an HTTP response,
159/// ready to be written with a single `writev()` call.
160#[derive(Debug)]
161pub struct ResponseChunks {
162    /// Status line buffer (either static or formatted)
163    status_line: StatusLine,
164    /// Headers buffer (formatted as "Name: Value\r\n...")
165    headers: BytesMut,
166    /// Header terminator (\r\n)
167    header_end: &'static [u8],
168    /// Body buffer
169    body: Bytes,
170}
171
172#[derive(Debug)]
173enum StatusLine {
174    Static(&'static [u8]),
175    Dynamic(BytesMut),
176}
177
178impl StatusLine {
179    fn as_slice(&self) -> &[u8] {
180        match self {
181            StatusLine::Static(s) => s,
182            StatusLine::Dynamic(b) => b,
183        }
184    }
185}
186
187impl ResponseChunks {
188    /// Create new response chunks from status, headers, and body.
189    pub fn new(status: u16, headers: &HashMap<String, String>, body: Bytes) -> Self {
190        // Status line
191        let status_line = if has_precomputed_status(status) {
192            StatusLine::Static(status_line(status))
193        } else {
194            let mut buf = BytesMut::with_capacity(32);
195            format_status_line(status, &mut buf);
196            StatusLine::Dynamic(buf)
197        };
198
199        // Headers - estimate size: avg 30 bytes per header
200        let mut headers_buf = BytesMut::with_capacity(headers.len() * 30 + 32);
201        for (name, value) in headers {
202            headers_buf.extend_from_slice(name.as_bytes());
203            headers_buf.extend_from_slice(HEADER_SEP);
204            headers_buf.extend_from_slice(value.as_bytes());
205            headers_buf.extend_from_slice(CRLF);
206        }
207
208        // Add Content-Length if body is not empty
209        if !body.is_empty() {
210            headers_buf.extend_from_slice(b"Content-Length: ");
211            // Format number without allocation
212            let len = body.len();
213            let mut num_buf = [0u8; 20];
214            let num_str = format_usize(len, &mut num_buf);
215            headers_buf.extend_from_slice(num_str);
216            headers_buf.extend_from_slice(CRLF);
217        }
218
219        Self {
220            status_line,
221            headers: headers_buf,
222            header_end: CRLF,
223            body,
224        }
225    }
226
227    /// Get the total size of the response.
228    #[inline]
229    pub fn total_len(&self) -> usize {
230        self.status_line.as_slice().len()
231            + self.headers.len()
232            + self.header_end.len()
233            + self.body.len()
234    }
235
236    /// Get IoSlices for vectored write.
237    ///
238    /// Returns slices that can be passed to `writev()`.
239    #[inline]
240    pub fn as_io_slices(&self) -> [IoSlice<'_>; 4] {
241        [
242            IoSlice::new(self.status_line.as_slice()),
243            IoSlice::new(&self.headers),
244            IoSlice::new(self.header_end),
245            IoSlice::new(&self.body),
246        ]
247    }
248
249    /// Get number of chunks.
250    #[inline]
251    pub fn chunk_count(&self) -> usize {
252        if self.body.is_empty() { 3 } else { 4 }
253    }
254
255    /// Serialize to a single contiguous buffer.
256    ///
257    /// Use this when vectored I/O is not available.
258    pub fn to_bytes(&self) -> Bytes {
259        let mut buf = BytesMut::with_capacity(self.total_len());
260        buf.extend_from_slice(self.status_line.as_slice());
261        buf.extend_from_slice(&self.headers);
262        buf.extend_from_slice(self.header_end);
263        buf.extend_from_slice(&self.body);
264        buf.freeze()
265    }
266}
267
268/// Format usize to bytes without allocation.
269#[inline]
270fn format_usize(n: usize, buf: &mut [u8; 20]) -> &[u8] {
271    if n == 0 {
272        buf[19] = b'0';
273        return &buf[19..];
274    }
275
276    let mut n = n;
277    let mut pos = 20;
278    while n > 0 && pos > 0 {
279        pos -= 1;
280        buf[pos] = (n % 10) as u8 + b'0';
281        n /= 10;
282    }
283    &buf[pos..]
284}
285
286/// A response builder optimized for vectored I/O.
287///
288/// Builds responses with separate buffers for efficient `writev()`.
289#[derive(Debug)]
290pub struct VectoredResponse {
291    status: u16,
292    headers: Vec<(String, String)>,
293    body: Option<Bytes>,
294}
295
296impl VectoredResponse {
297    /// Create a new response with status code.
298    #[inline]
299    pub fn new(status: u16) -> Self {
300        Self {
301            status,
302            headers: Vec::with_capacity(8),
303            body: None,
304        }
305    }
306
307    /// Create a 200 OK response.
308    #[inline]
309    pub fn ok() -> Self {
310        Self::new(200)
311    }
312
313    /// Set status code.
314    #[inline]
315    pub fn status(mut self, status: u16) -> Self {
316        self.status = status;
317        self
318    }
319
320    /// Add a header.
321    #[inline]
322    pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
323        self.headers.push((name.into(), value.into()));
324        self
325    }
326
327    /// Set Content-Type header.
328    #[inline]
329    pub fn content_type(self, value: impl Into<String>) -> Self {
330        self.header("Content-Type", value)
331    }
332
333    /// Set body from Bytes.
334    #[inline]
335    pub fn body(mut self, body: impl Into<Bytes>) -> Self {
336        self.body = Some(body.into());
337        self
338    }
339
340    /// Set body from JSON.
341    #[inline]
342    pub fn body_json<T: serde::Serialize>(self, value: &T) -> Result<Self, crate::Error> {
343        let json =
344            crate::json::to_vec(value).map_err(|e| crate::Error::Serialization(e.to_string()))?;
345        Ok(self
346            .content_type("application/json")
347            .body(Bytes::from(json)))
348    }
349
350    /// Build into ResponseChunks for vectored I/O.
351    #[inline]
352    pub fn build(self) -> ResponseChunks {
353        let headers: HashMap<String, String> = self.headers.into_iter().collect();
354        ResponseChunks::new(self.status, &headers, self.body.unwrap_or_default())
355    }
356
357    /// Build and serialize to contiguous bytes.
358    #[inline]
359    pub fn build_bytes(self) -> Bytes {
360        self.build().to_bytes()
361    }
362}
363
364impl Default for VectoredResponse {
365    fn default() -> Self {
366        Self::ok()
367    }
368}
369
370// ============================================================================
371// Conversion from HttpResponse
372// ============================================================================
373
374impl From<crate::HttpResponse> for ResponseChunks {
375    fn from(response: crate::HttpResponse) -> Self {
376        let status = response.status;
377        let headers = response.headers.to_hashmap();
378        let body = response.into_body_bytes();
379        Self::new(status, &headers, body)
380    }
381}
382
383impl crate::HttpResponse {
384    /// Convert to vectored response chunks.
385    ///
386    /// Use this for efficient vectored writes when you have
387    /// direct socket access.
388    #[inline]
389    pub fn into_chunks(self) -> ResponseChunks {
390        ResponseChunks::from(self)
391    }
392
393    /// Get IoSlices for vectored write.
394    ///
395    /// This is a convenience method that creates ResponseChunks
396    /// and returns the slices. For repeated use, prefer `into_chunks()`.
397    #[inline]
398    pub fn to_vectored(&self) -> ResponseChunks {
399        let headers = self.headers.to_hashmap();
400        ResponseChunks::new(self.status, &headers, self.body_bytes())
401    }
402}
403
404// ============================================================================
405// Statistics
406// ============================================================================
407
408use std::sync::atomic::{AtomicU64, Ordering};
409
410/// Statistics for vectored I/O operations.
411#[derive(Debug, Default)]
412pub struct VectoredIoStats {
413    /// Total vectored writes
414    writes: AtomicU64,
415    /// Total bytes written
416    bytes_written: AtomicU64,
417    /// Responses with precomputed status lines
418    precomputed_status: AtomicU64,
419    /// Responses with dynamic status lines
420    dynamic_status: AtomicU64,
421}
422
423impl VectoredIoStats {
424    /// Create new stats.
425    pub fn new() -> Self {
426        Self::default()
427    }
428
429    /// Record a write.
430    #[inline]
431    pub fn record_write(&self, bytes: usize, precomputed: bool) {
432        self.writes.fetch_add(1, Ordering::Relaxed);
433        self.bytes_written
434            .fetch_add(bytes as u64, Ordering::Relaxed);
435        if precomputed {
436            self.precomputed_status.fetch_add(1, Ordering::Relaxed);
437        } else {
438            self.dynamic_status.fetch_add(1, Ordering::Relaxed);
439        }
440    }
441
442    /// Get total writes.
443    pub fn writes(&self) -> u64 {
444        self.writes.load(Ordering::Relaxed)
445    }
446
447    /// Get total bytes written.
448    pub fn bytes_written(&self) -> u64 {
449        self.bytes_written.load(Ordering::Relaxed)
450    }
451
452    /// Get precomputed status line percentage.
453    pub fn precomputed_percentage(&self) -> f64 {
454        let total = self.writes();
455        if total == 0 {
456            return 0.0;
457        }
458        (self.precomputed_status.load(Ordering::Relaxed) as f64 / total as f64) * 100.0
459    }
460}
461
462/// Global vectored I/O statistics.
463static VECTORED_STATS: VectoredIoStats = VectoredIoStats {
464    writes: AtomicU64::new(0),
465    bytes_written: AtomicU64::new(0),
466    precomputed_status: AtomicU64::new(0),
467    dynamic_status: AtomicU64::new(0),
468};
469
470/// Get global vectored I/O statistics.
471pub fn vectored_stats() -> &'static VectoredIoStats {
472    &VECTORED_STATS
473}
474
475// ============================================================================
476// Tests
477// ============================================================================
478
479#[cfg(test)]
480mod tests {
481    use super::*;
482
483    #[test]
484    fn test_status_line_precomputed() {
485        assert_eq!(status_line(200), b"HTTP/1.1 200 OK\r\n");
486        assert_eq!(status_line(404), b"HTTP/1.1 404 Not Found\r\n");
487        assert_eq!(status_line(500), b"HTTP/1.1 500 Internal Server Error\r\n");
488    }
489
490    #[test]
491    fn test_format_status_line() {
492        let mut buf = BytesMut::with_capacity(64);
493        format_status_line(418, &mut buf);
494        assert!(buf.starts_with(b"HTTP/1.1 418"));
495    }
496
497    #[test]
498    fn test_format_usize() {
499        let mut buf = [0u8; 20];
500        assert_eq!(format_usize(0, &mut buf), b"0");
501        assert_eq!(format_usize(123, &mut buf), b"123");
502        assert_eq!(format_usize(1000000, &mut buf), b"1000000");
503    }
504
505    #[test]
506    fn test_response_chunks_basic() {
507        let mut headers = HashMap::new();
508        headers.insert("Content-Type".to_string(), "text/plain".to_string());
509
510        let chunks = ResponseChunks::new(200, &headers, Bytes::from_static(b"Hello"));
511
512        assert!(chunks.total_len() > 0);
513        assert_eq!(chunks.chunk_count(), 4);
514    }
515
516    #[test]
517    fn test_response_chunks_io_slices() {
518        let mut headers = HashMap::new();
519        headers.insert("X-Test".to_string(), "value".to_string());
520
521        let chunks = ResponseChunks::new(200, &headers, Bytes::from_static(b"body"));
522        let slices = chunks.as_io_slices();
523
524        assert_eq!(slices.len(), 4);
525        assert!(!slices[0].is_empty()); // Status line
526        assert!(!slices[1].is_empty()); // Headers
527    }
528
529    #[test]
530    fn test_response_chunks_to_bytes() {
531        let mut headers = HashMap::new();
532        headers.insert("Content-Type".to_string(), "text/plain".to_string());
533
534        let chunks = ResponseChunks::new(200, &headers, Bytes::from_static(b"Hello"));
535        let bytes = chunks.to_bytes();
536
537        let s = String::from_utf8_lossy(&bytes);
538        assert!(s.contains("HTTP/1.1 200 OK"));
539        assert!(s.contains("Content-Type: text/plain"));
540        assert!(s.contains("Hello"));
541    }
542
543    #[test]
544    fn test_vectored_response_builder() {
545        let chunks = VectoredResponse::ok()
546            .header("X-Custom", "test")
547            .body(Bytes::from_static(b"body data"))
548            .build();
549
550        let bytes = chunks.to_bytes();
551        let s = String::from_utf8_lossy(&bytes);
552        assert!(s.contains("X-Custom: test"));
553        assert!(s.contains("body data"));
554    }
555
556    #[test]
557    fn test_vectored_response_json() {
558        #[derive(serde::Serialize)]
559        struct Data {
560            status: &'static str,
561        }
562
563        let chunks = VectoredResponse::ok()
564            .body_json(&Data { status: "ok" })
565            .unwrap()
566            .build();
567
568        let bytes = chunks.to_bytes();
569        let s = String::from_utf8_lossy(&bytes);
570        assert!(s.contains("application/json"));
571        assert!(s.contains(r#""status":"ok""#));
572    }
573
574    #[test]
575    fn test_http_response_to_chunks() {
576        let response = crate::HttpResponse::ok()
577            .with_header("X-Test".to_string(), "value".to_string())
578            .with_body(b"test body".to_vec());
579
580        let chunks = response.into_chunks();
581        let bytes = chunks.to_bytes();
582        let s = String::from_utf8_lossy(&bytes);
583
584        assert!(s.contains("HTTP/1.1 200 OK"));
585        assert!(s.contains("X-Test: value"));
586        assert!(s.contains("test body"));
587    }
588
589    #[test]
590    fn test_empty_body() {
591        let chunks = ResponseChunks::new(204, &HashMap::new(), Bytes::new());
592        assert_eq!(chunks.chunk_count(), 3); // No body chunk
593    }
594
595    #[test]
596    fn test_content_length_header() {
597        let chunks = ResponseChunks::new(200, &HashMap::new(), Bytes::from_static(b"12345"));
598        let bytes = chunks.to_bytes();
599        let s = String::from_utf8_lossy(&bytes);
600        assert!(s.contains("Content-Length: 5"));
601    }
602}