1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use crate::common::conn_command_channel::ConnCommandSender;
use crate::common::increase_in_window::IncreaseInWindow;
use crate::common::stream_from_network::StreamFromNetwork;
use crate::common::stream_queue_sync::stream_queue_sync;
use crate::server::increase_in_window::ServerIncreaseInWindow;
use crate::server::stream_handler::ServerRequestStreamHandler;
use crate::server::stream_handler::ServerRequestStreamHandlerHolder;
use crate::server::types::ServerTypes;
use crate::Headers;
use crate::HttpStreamAfterHeaders;
use crate::StreamId;

pub struct ServerRequest<'a> {
    /// Request headers
    pub headers: Headers,
    /// True if requests ends with headers
    pub end_stream: bool,
    pub(crate) stream_id: StreamId,
    /// Stream in window size at the moment of request start
    pub(crate) in_window_size: u32,
    pub(crate) stream_handler: &'a mut Option<ServerRequestStreamHandlerHolder>,
    pub(crate) to_write_tx: &'a ConnCommandSender<ServerTypes>,
}

impl<'a> ServerRequest<'a> {
    pub fn make_stream(self) -> HttpStreamAfterHeaders {
        if self.end_stream {
            HttpStreamAfterHeaders::empty()
        } else {
            self.register_stream_handler(|increase_in_window| {
                let (inc_tx, inc_rx) = stream_queue_sync();
                let stream_from_network = StreamFromNetwork {
                    rx: inc_rx,
                    increase_in_window: increase_in_window.0,
                };

                (
                    inc_tx,
                    HttpStreamAfterHeaders::from_parts(stream_from_network),
                )
            })
        }
    }

    /// Register synchnous stream handler (callback will be called immediately
    /// when new data arrives). Note that increasing in window size is the handler
    /// responsibility.
    pub fn register_stream_handler<F, H, R>(self, f: F) -> R
    where
        F: FnOnce(ServerIncreaseInWindow) -> (H, R),
        H: ServerRequestStreamHandler,
    {
        assert!(self.stream_handler.is_none());
        let increase_window = ServerIncreaseInWindow(IncreaseInWindow {
            stream_id: self.stream_id,
            in_window_size: self.in_window_size,
            to_write_tx: self.to_write_tx.clone(),
        });
        let (h, r) = f(increase_window);
        *self.stream_handler = Some(ServerRequestStreamHandlerHolder(Box::new(h)));
        r
    }
}