sockjs 0.3.0

SockJS server for Actix
Documentation
use std::time::Duration;
use std::marker::PhantomData;

use actix::*;
use actix_web::*;
use actix_web::http::Method;
use actix_web::http::header::ACCESS_CONTROL_ALLOW_METHODS;

use context::ChannelItem;
use protocol::{Frame, CloseCode};
use utils::SockjsHeaders;
use session::Session;
use manager::{Broadcast, Record, SessionManager};

use super::{Transport, SendResult, Flags};


const OPEN_SEQ: &str =
    "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\
     hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh\n";

pub struct XhrStreaming<S, SM>
    where S: Session, SM: SessionManager<S>,
{
    s: PhantomData<S>,
    sm: PhantomData<SM>,
    size: usize,
    maxsize: usize,
    flags: Flags,
    rec: Option<Record>,
}

impl<S, SM> XhrStreaming<S, SM> where S: Session, SM: SessionManager<S> {

    pub fn init(req: HttpRequest<Addr<Syn, SM>>, maxsize: usize) -> Result<HttpResponse> {
        if *req.method() == Method::OPTIONS {
            return Ok(
                HttpResponse::NoContent()
                    .content_type("application/jsonscript; charset=UTF-8")
                    .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, POST")
                    .sockjs_cache_headers()
                    .sockjs_cors_headers(req.headers())
                    .sockjs_session_cookie(&req)
                    .finish())
        } else if *req.method() != Method::POST {
            return Ok(HttpResponse::NotFound().into())
        }

        let session = req.match_info().get("session").unwrap().to_owned();
        let mut resp = HttpResponse::Ok()
            .content_type("application/javascript; charset=UTF-8")
            .force_close()
            .sockjs_no_cache()
            .sockjs_session_cookie(&req)
            .sockjs_cors_headers(req.headers())
            .take();

        let mut ctx = HttpContext::new(
            req, XhrStreaming{maxsize,
                              s: PhantomData,
                              sm: PhantomData,
                              size: 0,
                              flags: Flags::empty(),
                              rec: None});
        ctx.write(OPEN_SEQ);

        // init transport, but aftre prelude only
        ctx.drain().map(move |_, _, ctx| {
            ctx.run_later(Duration::new(0, 1_200_000), move |act, ctx| {
                act.init_transport(session, ctx);
            });
        }).wait(&mut ctx);

        Ok(resp.body(ctx))
    }
}

// Http actor implementation
impl<S, SM> Actor for XhrStreaming<S, SM>
    where S: Session, SM: SessionManager<S>
{
    type Context = HttpContext<Self, Addr<Syn, SM>>;

    fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
        self.release(ctx);
        Running::Stop
    }
}

// Transport implementation
impl<S, SM> Transport<S, SM> for XhrStreaming<S, SM>
    where S: Session, SM: SessionManager<S>,
{
    fn send(&mut self,
            ctx: &mut Self::Context,
            msg: &Frame, record: &mut Record) -> SendResult
    {
        self.size += match *msg {
            Frame::Heartbeat => {
                ctx.write("h\n");
                2
            },
            Frame::Message(ref s) => {
                let s = format!("a[{:?}]\n", s);
                let size = s.len();
                ctx.write(s);
                size
            }
            Frame::MessageVec(ref s) => {
                let s = format!("a{}\n", s);
                let size = s.len();
                ctx.write(s);
                size
            }
            Frame::MessageBlob(_) => {
                // ctx.write(format!("a{}\n", s));
                0
            }
            Frame::Open => {
                ctx.write("o\n");
                2
            },
            Frame::Close(code) => {
                record.close();
                ctx.write(format!("c[{},{:?}]\n", code.num(), code.reason()));
                ctx.write_eof();
                return SendResult::Stop;
            }
        };

        if self.size > self.maxsize {
            ctx.write_eof();
            SendResult::Stop
        } else {
            SendResult::Continue
        }
    }

    fn send_heartbeat(&mut self, ctx: &mut Self::Context) {
        ctx.write("h\n");
    }

    fn send_close(&mut self, ctx: &mut Self::Context, code: CloseCode) {
        ctx.write(format!("c[{},{:?}]\n", code.num(), code.reason()));
    }

    fn session_record(&mut self) -> &mut Option<Record> {
        &mut self.rec
    }

    fn flags(&mut self) -> &mut Flags {
        &mut self.flags
    }
}

impl<S, SM> Handler<ChannelItem> for XhrStreaming<S, SM>
    where S: Session, SM: SessionManager<S>,
{
    type Result = ();

    fn handle(&mut self, msg: ChannelItem, ctx: &mut Self::Context) {
        self.handle_message(msg, ctx)
    }
}

impl<S, SM> Handler<Broadcast> for XhrStreaming<S, SM>
    where S: Session, SM: SessionManager<S>,
{
    type Result = ();

    fn handle(&mut self, msg: Broadcast, ctx: &mut Self::Context) {
        self.handle_broadcast(msg, ctx)
    }
}