1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use crate::{actors::peer, crypto::PublicKey, wire};
use std::net::SocketAddr;
use tokio::sync::{mpsc, oneshot};

pub enum Message {
    // Used by oracle
    Register {
        index: u64,
        peers: Vec<PublicKey>,
    },

    // Used by peer
    Construct {
        public_key: PublicKey,
        peer: peer::Mailbox,
    },
    BitVec {
        bit_vec: wire::BitVec,
        peer: peer::Mailbox,
    },
    Peers {
        peers: wire::Peers,
        peer: peer::Mailbox,
    },

    // Used by dialer
    Dialable {
        peers: oneshot::Sender<Vec<(PublicKey, SocketAddr, Reservation)>>,
    },

    // Used by listener
    Reserve {
        peer: PublicKey,
        reservation: oneshot::Sender<Option<Reservation>>,
    },

    // Used by peer
    Release {
        peer: PublicKey,
    },
}

#[derive(Clone)]
pub struct Mailbox {
    sender: mpsc::Sender<Message>,
}

impl Mailbox {
    pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
        Self { sender }
    }

    pub async fn construct(&self, public_key: PublicKey, peer: peer::Mailbox) {
        self.sender
            .send(Message::Construct { public_key, peer })
            .await
            .unwrap();
    }

    pub async fn bit_vec(&self, bit_vec: wire::BitVec, peer: peer::Mailbox) {
        self.sender
            .send(Message::BitVec { bit_vec, peer })
            .await
            .unwrap();
    }

    pub async fn peers(&self, peers: wire::Peers, peer: peer::Mailbox) {
        self.sender
            .send(Message::Peers { peers, peer })
            .await
            .unwrap();
    }

    pub async fn dialable(&self) -> Vec<(PublicKey, SocketAddr, Reservation)> {
        let (response, receiver) = oneshot::channel();
        self.sender
            .send(Message::Dialable { peers: response })
            .await
            .unwrap();
        receiver.await.unwrap()
    }

    pub async fn reserve(&self, peer: PublicKey) -> Option<Reservation> {
        let (tx, rx) = oneshot::channel();
        self.sender
            .send(Message::Reserve {
                peer,
                reservation: tx,
            })
            .await
            .unwrap();
        rx.await.unwrap()
    }

    pub async fn release(&self, peer: PublicKey) {
        self.sender.send(Message::Release { peer }).await.unwrap();
    }
}

#[derive(Clone)]
pub struct Oracle {
    sender: mpsc::Sender<Message>,
}

impl Oracle {
    pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
        Self { sender }
    }

    pub async fn register(&self, index: u64, peers: Vec<PublicKey>) {
        let _ = self.sender.send(Message::Register { index, peers }).await;
    }
}

pub struct Reservation {
    closer: Option<(PublicKey, Mailbox)>,
}

impl Reservation {
    pub fn new(peer: PublicKey, mailbox: Mailbox) -> Self {
        Self {
            closer: Some((peer, mailbox)),
        }
    }
}

impl Drop for Reservation {
    fn drop(&mut self) {
        let (peer, mailbox) = self.closer.take().unwrap();
        tokio::spawn(async move {
            mailbox.release(peer).await;
        });
    }
}