swindon 0.7.8

An HTTP edge (frontend) server with smart websockets support
import re

from unittest import mock


def assert_auth(req):
    assert req.path == '/swindon/authorize_connection'
    assert req.headers["Host"] == "swindon.internal"
    assert req.headers['Content-Type'] == 'application/json'
    assert re.match('^swindon/(\d+\.){2}\d+$', req.headers['User-Agent'])
    assert 'Authorization' not in req.headers


def assert_headers(req):
    assert req.headers["Host"] == "swindon.internal"
    assert req.headers['Content-Type'] == 'application/json'
    assert re.match('^swindon/(\d+\.){2}\d+$', req.headers['User-Agent'])


async def test_inactivity(proxy_server, swindon, loop):
    chat_url = swindon.url / 'swindon-lattice-w-timeouts'
    async with proxy_server() as proxy:
        handler = proxy.swindon_lattice(chat_url, timeout=1)
        req = await handler.request()
        assert_auth(req)
        ws = await handler.json_response({
            "user_id": 'user:1', "username": "Jim"})

        hello = await ws.receive_json()
        assert hello == [
            'hello', {}, {'user_id': 'user:1', 'username': 'Jim'}]

        req = await handler.request(timeout=1.2)
        assert req.path == '/swindon/session_inactive'
        assert_headers(req)
        assert req.headers.getall('Authorization') == [
            'Tangle eyJ1c2VyX2lkIjoidXNlcjoxIn0='
            ]
        assert await req.json() == [{}, [], {}]
        await handler.response(status=204)

        await ws.send_json([
            'whatever', {'request_id': '1', 'active': 2}, [], {}])
        req = await handler.request(timeout=5)
        assert req.path == '/whatever'
        assert_headers(req)
        assert await req.json() == [
            {'request_id': '1', 'active': 2, 'connection_id': mock.ANY},
            [], {}]
        await handler.response(status=200)

        req = await handler.request(timeout=3.2)
        assert req.path == '/swindon/session_inactive'
        assert_headers(req)
        assert req.headers.getall('Authorization') == [
            'Tangle eyJ1c2VyX2lkIjoidXNlcjoxIn0='
            ]
        assert await req.json() == [{}, [], {}]
        await handler.response(status=200)