use kawa::AsBuffer;
use crate::{
protocol::http::editor::{HeaderEditMode, HeaderEditSnapshot},
socket::{SocketHandler, SocketResult},
};
use super::Stream;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum EndStreamAction {
ForwardTerminated,
CloseDelimited,
ForwardUnterminated,
SendDefault(u16),
Reconnect,
}
pub(super) fn end_stream_decision(stream: &Stream) -> EndStreamAction {
if stream.back.is_main_phase() {
if stream.back.is_terminated() {
EndStreamAction::ForwardTerminated
} else if !stream.context.keep_alive_backend {
EndStreamAction::CloseDelimited
} else {
EndStreamAction::ForwardUnterminated
}
} else if stream.front.consumed {
EndStreamAction::SendDefault(502)
} else {
EndStreamAction::Reconnect
}
}
pub(super) fn drain_tls_close_notify<S: SocketHandler>(
socket: &mut S,
close_notify_sent: &mut bool,
) -> (bool, u32) {
const MAX_DRAIN_ROUNDS: u32 = 16;
if !*close_notify_sent {
socket.socket_close();
*close_notify_sent = true;
}
let mut drain_rounds = 0;
while socket.socket_wants_write() && drain_rounds < MAX_DRAIN_ROUNDS {
let (_size, status) = socket.socket_write_vectored(&[]);
drain_rounds += 1;
match status {
SocketResult::WouldBlock | SocketResult::Error | SocketResult::Closed => break,
SocketResult::Continue => {}
}
}
(socket.socket_wants_write(), drain_rounds)
}
pub(super) fn apply_response_header_edits<T: AsBuffer>(
kawa: &mut kawa::Kawa<T>,
edits: &[HeaderEditSnapshot],
) {
use kawa::{Block, Pair, Store};
if edits.is_empty() {
return;
}
let needs_existing_snapshot = edits
.iter()
.any(|e| matches!(e.mode, HeaderEditMode::SetIfAbsent));
let existing_keys: Vec<Vec<u8>> = if needs_existing_snapshot {
let buf = kawa.storage.buffer();
kawa.blocks
.iter()
.filter_map(|block| {
if let Block::Header(Pair { key, val: _ }) = block {
if matches!(key, Store::Empty) {
None
} else {
Some(key.data(buf).iter().map(u8::to_ascii_lowercase).collect())
}
} else {
None
}
})
.collect()
} else {
Vec::new()
};
let keys_to_drop: Vec<Vec<u8>> = edits
.iter()
.filter(|e| {
matches!(e.mode, HeaderEditMode::Set)
|| (matches!(e.mode, HeaderEditMode::Append) && e.val.is_empty())
})
.map(|e| e.key.iter().map(u8::to_ascii_lowercase).collect())
.collect();
if !keys_to_drop.is_empty() {
let buf = kawa.storage.buffer();
kawa.blocks.retain(|block| {
if let Block::Header(Pair { key, val: _ }) = block {
if matches!(key, Store::Empty) {
return true;
}
let key_lower: Vec<u8> = key.data(buf).iter().map(u8::to_ascii_lowercase).collect();
!keys_to_drop
.iter()
.any(|k| k.as_slice() == key_lower.as_slice())
} else {
true
}
});
}
let end_header_idx = end_of_headers_index(kawa);
let mut to_insert: Vec<Block> = Vec::new();
for edit in edits {
match edit.mode {
HeaderEditMode::Append if edit.val.is_empty() => continue,
HeaderEditMode::Append => {}
HeaderEditMode::SetIfAbsent => {
let key_lower: Vec<u8> = edit.key.iter().map(u8::to_ascii_lowercase).collect();
if existing_keys
.iter()
.any(|k| k.as_slice() == key_lower.as_slice())
{
continue;
}
}
HeaderEditMode::Set => {}
}
to_insert.push(Block::Header(Pair {
key: Store::from_slice(&edit.key),
val: Store::from_slice(&edit.val),
}));
}
if !to_insert.is_empty() {
let insert_at = end_header_idx.unwrap_or(kawa.blocks.len());
for (offset, block) in to_insert.into_iter().enumerate() {
kawa.blocks.insert(insert_at + offset, block);
}
}
}
pub(super) fn end_of_headers_index<T: AsBuffer>(kawa: &kawa::Kawa<T>) -> Option<usize> {
kawa.blocks.iter().position(|b| {
matches!(
b,
kawa::Block::Flags(kawa::Flags {
end_header: true,
..
})
)
})
}
#[cfg(test)]
mod tests {
use super::apply_response_header_edits;
use crate::protocol::http::editor::{HeaderEditMode, HeaderEditSnapshot};
use kawa::{
AsBuffer, Block, Buffer, Flags, Kawa, Kind, Pair, SliceBuffer, StatusLine, Store, Version,
};
fn make_kawa<'a>(buf: &'a mut [u8]) -> Kawa<SliceBuffer<'a>> {
Kawa::new(Kind::Response, Buffer::new(SliceBuffer(buf)))
}
fn pretty_blocks<T: AsBuffer>(kawa: &Kawa<T>) -> Vec<(Vec<u8>, Vec<u8>)> {
let buf = kawa.storage.buffer();
kawa.blocks
.iter()
.filter_map(|b| {
if let Block::Header(Pair { key, val }) = b {
if matches!(key, Store::Empty) {
Some((b"<elided>".to_vec(), b"".to_vec()))
} else {
Some((key.data(buf).to_vec(), val.data(buf).to_vec()))
}
} else {
None
}
})
.collect()
}
#[test]
fn test_response_edit_offsets_with_elided_headers() {
let mut buf = vec![0u8; 4096];
let mut kawa = make_kawa(&mut buf);
kawa.detached.status_line = StatusLine::Response {
version: Version::V11,
code: 200,
status: Store::Static(b"200"),
reason: Store::Static(b"OK"),
};
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Static(b"server"),
val: Store::Static(b"sozu"),
}));
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Empty,
val: Store::Empty,
}));
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Static(b"content-length"),
val: Store::Static(b"3"),
}));
kawa.blocks.push_back(Block::Flags(Flags {
end_body: false,
end_chunk: false,
end_header: true,
end_stream: false,
}));
apply_response_header_edits(
&mut kawa,
&[
HeaderEditSnapshot {
key: b"x-frame-options".to_vec(),
val: b"DENY".to_vec(),
mode: HeaderEditMode::Append,
},
HeaderEditSnapshot {
key: b"x-content-type-options".to_vec(),
val: b"nosniff".to_vec(),
mode: HeaderEditMode::Append,
},
],
);
let names: Vec<Vec<u8>> = pretty_blocks(&kawa).into_iter().map(|(k, _)| k).collect();
assert_eq!(
names,
vec![
b"server".to_vec(),
b"<elided>".to_vec(),
b"content-length".to_vec(),
b"x-frame-options".to_vec(),
b"x-content-type-options".to_vec(),
],
"new headers must land in order, between the last real header and the end-of-headers flag"
);
assert!(
matches!(
kawa.blocks.back(),
Some(Block::Flags(Flags {
end_header: true,
..
}))
),
"end_header flag must remain the last block"
);
}
#[test]
fn test_response_edit_delete_skips_elided_blocks() {
let mut buf = vec![0u8; 4096];
let mut kawa = make_kawa(&mut buf);
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Empty,
val: Store::Empty,
}));
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Static(b"x-internal"),
val: Store::Static(b"secret"),
}));
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Empty,
val: Store::Empty,
}));
kawa.blocks.push_back(Block::Flags(Flags {
end_body: false,
end_chunk: false,
end_header: true,
end_stream: false,
}));
apply_response_header_edits(
&mut kawa,
&[HeaderEditSnapshot {
key: b"X-Internal".to_vec(),
val: Vec::new(),
mode: HeaderEditMode::Append,
}],
);
let names: Vec<Vec<u8>> = pretty_blocks(&kawa).into_iter().map(|(k, _)| k).collect();
assert_eq!(
names,
vec![b"<elided>".to_vec(), b"<elided>".to_vec()],
"x-internal must be dropped; elided blocks must survive the retain pass"
);
}
#[test]
fn test_response_edit_delete_then_set_replaces() {
let mut buf = vec![0u8; 4096];
let mut kawa = make_kawa(&mut buf);
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Static(b"cache-control"),
val: Store::Static(b"public"),
}));
kawa.blocks.push_back(Block::Flags(Flags {
end_body: false,
end_chunk: false,
end_header: true,
end_stream: false,
}));
apply_response_header_edits(
&mut kawa,
&[
HeaderEditSnapshot {
key: b"Cache-Control".to_vec(),
val: Vec::new(),
mode: HeaderEditMode::Append,
},
HeaderEditSnapshot {
key: b"Cache-Control".to_vec(),
val: b"no-store".to_vec(),
mode: HeaderEditMode::Append,
},
],
);
let pairs = pretty_blocks(&kawa);
assert_eq!(pairs.len(), 1, "only the replacement header must remain");
assert_eq!(pairs[0].0, b"Cache-Control");
assert_eq!(pairs[0].1, b"no-store");
}
#[test]
fn test_response_edit_set_if_absent_skips_when_present() {
let mut buf = vec![0u8; 4096];
let mut kawa = make_kawa(&mut buf);
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Static(b"strict-transport-security"),
val: Store::Static(b"max-age=12345"),
}));
kawa.blocks.push_back(Block::Flags(Flags {
end_body: false,
end_chunk: false,
end_header: true,
end_stream: false,
}));
apply_response_header_edits(
&mut kawa,
&[HeaderEditSnapshot {
key: b"strict-transport-security".to_vec(),
val: b"max-age=31536000".to_vec(),
mode: HeaderEditMode::SetIfAbsent,
}],
);
let pairs = pretty_blocks(&kawa);
assert_eq!(
pairs.len(),
1,
"SetIfAbsent must not duplicate an existing header"
);
assert_eq!(pairs[0].0, b"strict-transport-security");
assert_eq!(
pairs[0].1, b"max-age=12345",
"the upstream-supplied STS value must be preserved unchanged"
);
}
#[test]
fn test_response_edit_set_if_absent_inserts_when_absent() {
let mut buf = vec![0u8; 4096];
let mut kawa = make_kawa(&mut buf);
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Static(b"server"),
val: Store::Static(b"sozu"),
}));
kawa.blocks.push_back(Block::Flags(Flags {
end_body: false,
end_chunk: false,
end_header: true,
end_stream: false,
}));
apply_response_header_edits(
&mut kawa,
&[HeaderEditSnapshot {
key: b"strict-transport-security".to_vec(),
val: b"max-age=31536000".to_vec(),
mode: HeaderEditMode::SetIfAbsent,
}],
);
let pairs = pretty_blocks(&kawa);
assert_eq!(
pairs.len(),
2,
"SetIfAbsent must insert exactly one new header when absent"
);
assert_eq!(pairs[0].0, b"server");
assert_eq!(pairs[1].0, b"strict-transport-security");
assert_eq!(pairs[1].1, b"max-age=31536000");
assert!(
matches!(
kawa.blocks.back(),
Some(Block::Flags(Flags {
end_header: true,
..
}))
),
"end_header flag must remain the last block"
);
}
#[test]
fn test_response_edit_set_replaces_existing_header() {
let mut buf = vec![0u8; 4096];
let mut kawa = make_kawa(&mut buf);
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Static(b"Strict-Transport-Security"),
val: Store::Static(b"max-age=300"),
}));
kawa.blocks.push_back(Block::Flags(Flags {
end_body: false,
end_chunk: false,
end_header: true,
end_stream: false,
}));
apply_response_header_edits(
&mut kawa,
&[HeaderEditSnapshot {
key: b"strict-transport-security".to_vec(),
val: b"max-age=31536000; includeSubDomains".to_vec(),
mode: HeaderEditMode::Set,
}],
);
let pairs = pretty_blocks(&kawa);
assert_eq!(
pairs.len(),
1,
"Set must drop the pre-existing header and insert exactly one"
);
assert_eq!(pairs[0].0, b"strict-transport-security");
assert_eq!(pairs[0].1, b"max-age=31536000; includeSubDomains");
}
#[test]
fn test_response_edit_set_inserts_when_absent() {
let mut buf = vec![0u8; 4096];
let mut kawa = make_kawa(&mut buf);
kawa.blocks.push_back(Block::Header(Pair {
key: Store::Static(b"server"),
val: Store::Static(b"sozu"),
}));
kawa.blocks.push_back(Block::Flags(Flags {
end_body: false,
end_chunk: false,
end_header: true,
end_stream: false,
}));
apply_response_header_edits(
&mut kawa,
&[HeaderEditSnapshot {
key: b"strict-transport-security".to_vec(),
val: b"max-age=31536000".to_vec(),
mode: HeaderEditMode::Set,
}],
);
let pairs = pretty_blocks(&kawa);
assert_eq!(pairs.len(), 2);
assert_eq!(pairs[0].0, b"server");
assert_eq!(pairs[1].0, b"strict-transport-security");
assert_eq!(pairs[1].1, b"max-age=31536000");
}
}