condure 1.10.0

HTTP/WebSocket connection manager
Documentation
# this handler holds all connections open

import os
import time
import datetime
import calendar
import tnetstring
import zmq

CONN_TTL = 60000
EXPIRE_INTERVAL = 60000

instance_id = 'holdhandler.{}'.format(os.getpid()).encode('utf-8')

ctx = zmq.Context()
in_sock = ctx.socket(zmq.PULL)
in_sock.connect('ipc://client-out')
in_stream_sock = ctx.socket(zmq.ROUTER)
in_stream_sock.identity = instance_id
in_stream_sock.connect('ipc://client-out-stream')
out_sock = ctx.socket(zmq.PUB)
out_sock.connect('ipc://client-in')

poller = zmq.Poller()
poller.register(in_sock, zmq.POLLIN)
poller.register(in_stream_sock, zmq.POLLIN)

class Connection(object):
    def __init__(self, rid):
        self.rid = rid
        self.seq = 0
        self.exp_time = None

    def send_msg(self, msg):
        msg[b'from'] = instance_id
        msg[b'id'] = self.rid[1]
        msg[b'seq'] = self.seq
        self.seq += 1

        print('OUT {} {}'.format(self.rid[0], msg))
        out_sock.send(self.rid[0] + b' T' + tnetstring.dumps(msg))

    def send_header(self):
        msg = {}
        msg[b'code'] = 200
        msg[b'reason'] = b'OK'
        msg[b'headers'] = [[b'Content-Type', b'text/plain']]
        msg[b'more'] = True

        self.send_msg(msg)

    def send_body(self, data):
        msg = {}
        msg[b'body'] = data
        msg[b'more'] = True

        self.send_msg(msg)

def send_body(to_addr, conns, data):
    ids = []
    for c in conns:
        ids.append({b'id': c.rid[1], b'seq': c.seq})
        c.seq += 1

    msg = {}
    msg[b'from'] = instance_id
    msg[b'id'] = ids
    msg[b'body'] = data
    msg[b'more'] = True

    print('OUT {} {}'.format(to_addr, msg))
    out_sock.send(to_addr + b' T' + tnetstring.dumps(msg))

conns = {}
last_exp_time = int(time.time())

while True:
    socks = dict(poller.poll(1000))

    if socks.get(in_sock) == zmq.POLLIN:
        m_raw = in_sock.recv()
    elif socks.get(in_stream_sock) == zmq.POLLIN:
        m_list = in_stream_sock.recv_multipart()
        m_raw = m_list[2]
    else:
        m_raw = None

    now = int(time.time() * 1000)

    if m_raw is not None:
        req = tnetstring.loads(m_raw[1:])
        print('IN {}'.format(req))

        m_from = req[b'from']
        m_id = req[b'id']
        m_type = req.get(b'type', b'')

        ids = []
        if isinstance(m_id, list):
            for id_seq in m_id:
                ids.append(id_seq[b'id'])
        else:
            ids.append(m_id)

        new_ids = []
        known_conns = []
        for i in ids:
            rid = (m_from, i)

            c = conns.get(rid)
            if c:
                c.exp_time = now + CONN_TTL
                known_conns.append(c)
            else:
                new_ids.append(rid)

        # data
        if not m_type:
            for rid in new_ids:
                c = Connection(rid)
                conns[rid] = c
                c.exp_time = now + CONN_TTL
                c.send_header()
        elif c:
            if m_type == b'keep-alive':
                dt = datetime.datetime.utcnow()
                ts = calendar.timegm(dt.timetuple())

                body = (
                    'id: TCPKaliMsgTS-{:016x}.\n'
                    'event: message\n'
                    'data: {:04}-{:02}-{:02}T{:02}:{:02}:{:02}\n\n'
                ).format(
                    (ts * 1000000) + dt.microsecond,
                    dt.year,
                    dt.month,
                    dt.day,
                    dt.hour,
                    dt.minute,
                    dt.second
                ).encode()

                send_body(m_from, known_conns, body)
            elif m_type == b'cancel':
                for c in known_conns:
                    del conns[c.rid]

    if now >= last_exp_time + EXPIRE_INTERVAL:
        last_exp_time = now

        to_remove = []
        for rid, c in conns.items():
            if now >= c.exp_time:
                to_remove.append(rid)
        for rid in to_remove:
            print('expired {}'.format(rid))
            del conns[rid]