key-message-channel 0.1.0

Multi-producer single-consumer queue capable of queuing messages by message key.
Documentation
# 思路

本题目要求实现一个带 Key 的 mpsc channel。因为只有当队列中的消息不与还处于 active 状态的消息相冲突才能够出队,所以:
* 需要有一个结构来保存目前处于 active 状态的所有消息的 Key;
* 当一个消息想要出队时,首先与该结构进行比较,没有交集的情况下才可以出队,否则继续等待;
* 当处于 active 状态的消息被 Drop 时,能够将该结构中该消息的所有 Key 删除掉。

最简单的办法是用一个`HashSet`来保存所有的 active keys,当 receiver 调用`recv`方法时,依次检查队列中的每个消息的 Key 是否与 active keys 有交集,如果没有,将该消息出队;如果所有消息都有交集,则 receiver 进入阻塞状态,直到有 active 的消息被 Drop 了,或者是 sender 送来了新的消息时,再次重复如上步骤。

这种方法的缺点在于,当任意一个 active 的消息被 Drop 后,队列中所有的消息都会被检验一遍,很费时间。

因此想到,可以借鉴等待队列的思想,用一个单独的结构保存有哪些消息在等待哪一个 active key,当某一个 active 的消息被 Drop 后,将会**通知它所对应的若干个等待队列**,在通知的过程中,如果**发现队列中的某一个消息不需要再等待任何 key 了**,则说明这个消息不再与当前任何一个 active 的消息冲突,于是可以将其送到队列中,等待出队。

尽管只有当不与任何 active 的消息冲突时,一个消息才会入队,但这不代表这个消息就一定能够出队,因为队列具有先后顺序,排在该消息之前的消息将会早于该消息出队,一旦出队之后,这些消息就会变成 active 的,从而可能与当前的消息发生冲突。所以在出队之前,需要再次检查消息是否与当前的 active keys 有交集,如果有,则重新将该消息放回到等待队列中继续等待;如果没有,将这个消息的所有 key 添加到 active keys 中,成功出队。

有了这样一个“出队检查”机制后,对于 sender 新送过来的消息,可以让这些消息直接入队,当这些消息即将出队时再检查它们是否有冲突,并放置到相应的等待队列中。

# 数据结构

## `Message`

消息结构体,持有多个 Key。

每个消息结构体还包含着两个引用:一个为当前 channel 的`active_set`引用,一个`Condvar`引用。当消息被 Drop 时,将会:

1. 从`active_set`中把自己的所有 key 删除掉;
2. 通知条件变量,唤醒 receiver。

## `MessageQueue`

消息队列。这个队列是对无锁 mpsc 队列的简单封装,增强了`enqueue`的功能:当有消息入队时,该消息队列将会通知条件变量,唤醒 receiver。

在无锁队列的选择上,选择了一种名为[`jiffy`](https://arxiv.org/abs/2010.14189)的队列实现,这种队列性能较高,且能保证线性一致性。

## `Activator`

该结构体负责管理上述的 active keys 以及等待队列,从而让消息在合适的时候进入到消息队列中。

### `active_set`

一个保存着所有 active keys 的 `HashSet`。

为了降低锁的粒度,在该结构中包含:

* 一个`Mutex<HashSet>`,用于从多个来源修改 active keys;
* 一个`AtomicBool`,名称为`modified`,**标识 active keys 是否被修改,且没有与`wait_list`进行同步**。

通过将锁的粒度缩小,receiver 能够直接读取`modified`来得知当前的`active_set`是否需要进行同步,从而避免不必要的同步。

这里的“多个来源”,指:

1. 当处于 active 状态的消息被 Drop 时,需要在`active_set`中删除该消息的所有 Key;
2. 当`receiver`成功`recv`一个消息后,需要将新的消息的 Key 添加到`active_set`中。

这两个更改来源来自于不同的结构,因此需要将`HashSet`上锁。

### `wait_list`

一个保存着若干 key,以及等待着这个 key 的若干消息的引用的`HashMap`。同时该结构体中还保存着对消息队列的引用。

这个`HashMap`应该与`active_set`是同步的,同步的含义为:

* 当某个 key 被移出了`active_set`,则`wait_list`也会相对应的删除这个 key,并通知所有等待在这个 key 上的消息;
* 当某个 key 被新加入了`active_set`,则`wait_list`也会增加相应的空队列。

由上面可知,当某个 key 不再 active 时,`wait_list`需要有某种方式来通知等待在该 key 上的消息:该 key 已经被释放,不需要再等待这个 key 了。当某个消息没有任何需要等待的 key 时,就可以知道该消息现在不与任何 active 的消息相冲突,因此**可以将该事件添加到消息队列中,等待出队**。

在实际实现上,这个“某种方式”非常简单:等待队列中保存的是`Arc<Message<T>>`,因此当队列被删除时,指针的引用计数就会相应减 1,达成了一种“通知”的效果。当某个消息的引用计数为 1:`Arc::strong_count(&msg) == 1`时,该消息现在不与任何 active 的消息相冲突,于是可以入队。

`wait_list`的改变只有一个来源,即 receiver:当 receiver 被唤醒时,执行`wait_list`与`active_set`的同步,从而改变`wait_list`中的内容。

## `Receiver`

channel 的接收端。

接收端有`recv`函数,用来从队列中接收消息并返回。

`recv`的执行过程为:

1. 首先执行`synchronize`,将不与 active 的消息相冲突的消息放入到队列中;
2. 从队列中取消息,检查消息的 key 是否与当前的`active_set`有交集;
3. 如果有交集,说明队列前面有其他消息先于该消息出队了,因此需要将该消息重新添加回等待队列进行等待,然后继续取下一个消息,重复上述过程;
4. 如果没有交集,则将该消息的 key 添加到`active_set`中,然后将该消息出队;
5. 如果没有可以返回的消息,则挂起,等待条件变量通知,重复上述过程。

`recv`挂起的原因可能有:

* 当前队列为空;
* 当前队列不为空,但队列中的所有消息都与 active 的消息相冲突;

而唤醒`recv`的原因可能有:

* `Sender`在队列中添加了新的消息;
* active 的消息被 Drop 了,导致`active_set`更新。

## `Sender`

channel 的发送端。

发送端有`send`函数,将会在消息队列中添加新的消息。由于消息队列是无锁、无限容量的队列,所以多个 sender(在不上锁的前提下)同时向消息队列中添加消息不会导致并发冲突。