Broker constitutes of following components
----------------
* N connections. 1 per client
* Router which holds all subscription information and handles to
communicate with all the clients. This might not be super efficient as
data travels from connection 1 -> router -> connection 2. But might
not be significant if prevent clones (just send fat pointers) and batch
* Same router which reads device state from all the connections and actions
from the backend
* Timestone which reads all the data from all the connections and writes
to disk and tries to simulate kafka
Device id as part of topic in subscriptions and publishes
-----------------
* Maybe useful to directly send publishes to backend (where router picks up device id directly)
* Subscriptions only receive data directed to it
* Get in the way of wildcards inplace of device ids
Broker cluster and replication
-----------------
* State sits with connection at this time. Connection receives publishes
acks and forwards data to router.
* This might get in the way of HA and replication as connection acks
without not being sure if router has replicated the data
Current design:
connection 1 -> router -> connection 2
| |
ack replication
Alternate:
connection 1 -> router -> connection 2
|
ack <- replication
But this puts the overhead of sending all acks back to the connection
over a channel and receiving acks of forwards to update the state. Maybe
we can microbatch smartly.
Advantages of router maintaining the state
---------------
* Necessary for replication to happen centrally. Router maintains
connections to all the other brokers with out each connection
maintaining these. Router replicates and sends ack to the connection
where it's written to the network
* All distributed logic at one place
* Connections are stateless
Disadvantages
-------------
* Acks from/to router. Adds to processing. Microbatching can help here
Shared router experiment
-------------
* Router maintains all the state of connections
Connections just forward data to the router
* Router can be a shared with multiple connections on a thread with Rc
This can make the router forwarding asynchronous
Having a router as a separate thread makes asynchronous forwarding
difficult as number of connections are dynamic and we need to `select`
over them
* 4 threaded broker will have 4 routers which caters n connections
This is a good middle ground between each connection having knowledge of
all the other connections vs single router thread maintaining all the
connection knowledge.
Each router now has info about all it's connections and the other
routers
* Routers will talk to each other to forward publisher to a connection
on different router
We don't want to forward data directly to the connection from a random
router as we want to save the outgoing state of a connection on a
single router. Prevents fragmentation
* This inter router communication can be extended across network to make
the broker distributed
Findings:
-------
* Need to separate mutable and immutable parts well
* Can't use tokio channel as sends need mutable borrows
* Even async std channels are a problem because the router can block
during `rc_router_tx.send()` causing Borrow mut panic while creating a
new connection
* Difficult to see Bandwidth of router and connections separately
Distributed commit log
----------------
* Router design should be extend to be a distributed commit log like
kafka
* But doing disk operations on router (which is shared by async
connections) is not correct
We probably can use BufWriter to fill the file and forward it to a
different thread?
* We are now copying the outoing state of each connection. We need to
save a single commitlog per topic make make the subscribers just hold
offset like kafka does
TODO
---------------
* Use hash of connection id strings to numbers and validate perf
* Make sure that whole router doesn't slowdown because of one slow connection
* Inflight message limits. This should be similar to client. Slow acks should
throttle down the connection eventloop channel receiver. This
pushes the backpressure to the router. But this should not slow down
other connections. Essentially everything should be concurrent in a
select! in the router
* Add throughput metrics to every connection. Logging based
instrumentation will be very handy here but we don't have a design on
dashboards on top of log based metrics
* Production ready accept loop. https://github.com/async-rs/async-std/pull/666/files
* Better errors when tls connection happens on tcp port
```
Client side error:
Received = StreamEnd(Network(Io(Custom { kind: UnexpectedEof, error: "tls handshake eof" })))
Server side error:
ERROR librumqd::connection > Connect packet error = Timeout(Elapsed(()))
```
References
--------------
* https://bulldog2011.github.io/blog/2013/03/27/the-architecture-and-design-of-a-pub-sub-messaging-system/