1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use crate::common::conn_command_channel::ConnCommandSender;
use crate::common::increase_in_window::IncreaseInWindow;
use crate::common::stream_from_network::StreamFromNetwork;
use crate::common::stream_queue_sync::stream_queue_sync;
use crate::server::increase_in_window::ServerIncreaseInWindow;
use crate::server::stream_handler::ServerRequestStreamHandler;
use crate::server::stream_handler::ServerRequestStreamHandlerHolder;
use crate::server::types::ServerTypes;
use crate::Headers;
use crate::HttpStreamAfterHeaders;
use crate::StreamId;
pub struct ServerRequest<'a> {
pub headers: Headers,
pub end_stream: bool,
pub(crate) stream_id: StreamId,
pub(crate) in_window_size: u32,
pub(crate) stream_handler: &'a mut Option<ServerRequestStreamHandlerHolder>,
pub(crate) to_write_tx: &'a ConnCommandSender<ServerTypes>,
}
impl<'a> ServerRequest<'a> {
pub fn make_stream(self) -> HttpStreamAfterHeaders {
if self.end_stream {
HttpStreamAfterHeaders::empty()
} else {
self.register_stream_handler(|increase_in_window| {
let (inc_tx, inc_rx) = stream_queue_sync();
let stream_from_network = StreamFromNetwork {
rx: inc_rx,
increase_in_window: increase_in_window.0,
};
(
inc_tx,
HttpStreamAfterHeaders::from_parts(stream_from_network),
)
})
}
}
pub fn register_stream_handler<F, H, R>(self, f: F) -> R
where
F: FnOnce(ServerIncreaseInWindow) -> (H, R),
H: ServerRequestStreamHandler,
{
assert!(self.stream_handler.is_none());
let increase_window = ServerIncreaseInWindow(IncreaseInWindow {
stream_id: self.stream_id,
in_window_size: self.in_window_size,
to_write_tx: self.to_write_tx.clone(),
});
let (h, r) = f(increase_window);
*self.stream_handler = Some(ServerRequestStreamHandlerHolder(Box::new(h)));
r
}
}