1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use crate::adapter_warp::AdapterResponse;
use async_trait::async_trait;
use bytes::Bytes;
use engine_io_server::packet::Packet;
use engine_io_server::transport::{
    PollingResponder, PollingTransportOptions, RequestReply, ResponseBodyData, TransportEvent,
    TransportImpl,
};
use engine_io_server::util::{HttpMethod, RequestContext};
use engine_io_parser::binary::decoder as binaryDecoder;
use engine_io_parser::binary::encoder as binaryEncoder;
use engine_io_parser::string::decoder as stringDecoder;
use engine_io_parser::string::encoder as stringEncoder;
use tokio::sync::mpsc;
use warp::http::header::{HeaderValue, CACHE_CONTROL, CONTENT_TYPE};
use warp::http::Response;

pub struct PollingTransport {
    pub mode: Mode,
    pub event_sender: mpsc::Sender<TransportEvent>,
    pub writable: bool,
    pub options: PollingTransportOptions,
}

#[derive(Debug, Copy, Clone, PartialEq)]
pub enum Mode {
    Xhr,
    Jsonp,
}

#[async_trait]
impl TransportImpl<AdapterResponse> for PollingTransport {
    async fn open(&self) {
        unimplemented!();
    }

    async fn close(&mut self) {
        self.event_sender.send(TransportEvent::Close).await;
    }

    fn discard(&self) {
        unimplemented!();
    }

    async fn send(&mut self, packets: Vec<Packet>) {
        self.writable = false;
        // TODO: handle the shouldClose callback business
    }

    fn is_writable(&self) -> bool {
        self.writable
    }

    async fn handle_request(
        &mut self,
        request_context: &RequestContext,
    ) -> RequestReply<AdapterResponse> {
        match (request_context.http_method, self.mode) {
            (HttpMethod::Get, _) => self.handle_poll_request(request_context).await,
            (HttpMethod::Post, _) => self.handle_data_request(request_context).await,
            (HttpMethod::Options, Mode::Xhr) => self.handle_options_request(request_context),
            (_, _) => self.respond_with_status_code_only(500),
        }
    }
}

impl PollingResponder<AdapterResponse> for PollingTransport {
    fn respond_with_packets(&mut self, request_context: &RequestContext, packets: Vec<Packet>) -> AdapterResponse {
        self.writable = false;

        let content: ResponseBodyData = if self.options.supports_binary {
            ResponseBodyData::Binary(binaryEncoder::encode_payload(&packets))
        } else {
            ResponseBodyData::Plaintext(stringEncoder::encode_payload(&packets))
        };
        respond(content, request_context, self.options.http_compression.is_some())
    }
}

impl PollingTransport {
    async fn handle_poll_request(
        &mut self,
        request_context: &RequestContext,
    ) -> RequestReply<AdapterResponse> {
        // TODO: handle premature close??
        self.writable = true;
        // Ignoring the mpsc receivers dropped error
        let _ = self.event_sender.send(TransportEvent::Drain).await;

        // TODO: if we're still writable but had a pending close, trigger an empty send

        // This will cause the caller (Socket<A>) to do a flush, and then call `respond_with_packets`.
        RequestReply::Action(TransportEvent::Drain)
    }
    async fn handle_data_request(
        &mut self,
        request_context: &RequestContext,
    ) -> RequestReply<AdapterResponse> {
        // TODO: fix this
        Response::builder()
            .status(200)
            .body(Bytes::new())
            .unwrap()
            .into()
    }

    fn handle_options_request(
        &self,
        request_context: &RequestContext,
    ) -> RequestReply<AdapterResponse> {
        // TODO: fix this
        Response::builder()
            .status(200)
            .body(Bytes::new())
            .unwrap()
            .into()
    }

    fn respond_with_status_code_only(&self, status_code: u16) -> RequestReply<AdapterResponse> {
        Response::builder()
            .status(status_code)
            .body(Bytes::new())
            .unwrap()
            .into()
    }
}

fn respond(content: ResponseBodyData, context: &RequestContext, compress: bool) -> AdapterResponse {
    let content_type = match &content {
        ResponseBodyData::Plaintext(_) => "text/plain; charset=UTF-8",
        ResponseBodyData::Binary(_) => "application/octet-stream",
    };
    // TODO: compression
    // TODO: Content-Encoding header from encryption
    // TODO: X-XSS-Protection header internet explorer

    let content_bytes: Bytes = content.into_bytes();

    Response::builder()
        .status(200)
        .header(CONTENT_TYPE, HeaderValue::from_static(content_type))
        .body(content_bytes)
        .unwrap()
}