import re
from unittest import mock
def assert_auth(req):
assert req.path == '/swindon/authorize_connection'
assert req.headers["Host"] == "swindon.internal"
assert req.headers['Content-Type'] == 'application/json'
assert re.match('^swindon/(\d+\.){2}\d+$', req.headers['User-Agent'])
assert 'Authorization' not in req.headers
def assert_headers(req):
assert req.headers["Host"] == "swindon.internal"
assert req.headers['Content-Type'] == 'application/json'
assert re.match('^swindon/(\d+\.){2}\d+$', req.headers['User-Agent'])
async def test_inactivity(proxy_server, swindon, loop):
chat_url = swindon.url / 'swindon-lattice-w-timeouts'
async with proxy_server() as proxy:
handler = proxy.swindon_lattice(chat_url, timeout=1)
req = await handler.request()
assert_auth(req)
ws = await handler.json_response({
"user_id": 'user:1', "username": "Jim"})
hello = await ws.receive_json()
assert hello == [
'hello', {}, {'user_id': 'user:1', 'username': 'Jim'}]
req = await handler.request(timeout=1.2)
assert req.path == '/swindon/session_inactive'
assert_headers(req)
assert req.headers.getall('Authorization') == [
'Tangle eyJ1c2VyX2lkIjoidXNlcjoxIn0='
]
assert await req.json() == [{}, [], {}]
await handler.response(status=204)
await ws.send_json([
'whatever', {'request_id': '1', 'active': 2}, [], {}])
req = await handler.request(timeout=5)
assert req.path == '/whatever'
assert_headers(req)
assert await req.json() == [
{'request_id': '1', 'active': 2, 'connection_id': mock.ANY},
[], {}]
await handler.response(status=200)
req = await handler.request(timeout=3.2)
assert req.path == '/swindon/session_inactive'
assert_headers(req)
assert req.headers.getall('Authorization') == [
'Tangle eyJ1c2VyX2lkIjoidXNlcjoxIn0='
]
assert await req.json() == [{}, [], {}]
await handler.response(status=200)