1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
use bones_matchmaker_proto::{MatchInfo, MatchmakerRequest, MatchmakerResponse};
use iroh_net::{endpoint::get_remote_node_id, Endpoint, NodeAddr};
use once_cell::sync::Lazy;
use quinn::{Connection, ConnectionError};
use scc::HashMap;
pub async fn handle_connection(ep: Endpoint, conn: Connection) {
let connection_id = conn.stable_id();
debug!(connection_id, "Accepted matchmaker connection");
if let Err(e) = impl_matchmaker(ep, conn).await {
match e.downcast::<ConnectionError>() {
Ok(conn_err) => match conn_err {
ConnectionError::ApplicationClosed(e) => {
debug!(connection_id, "Application close connection: {e:?}");
}
e => {
error!(connection_id, "Error in matchmaker connection: {e:?}");
}
},
Err(e) => {
error!(connection_id, "Error in matchmaker connection: {e:?}");
}
}
}
}
/// The matchmaker state
#[derive(Default)]
struct State {
/// The mapping of match info to the vector connected clients in the waiting room.
rooms: HashMap<MatchInfo, Vec<Connection>>,
}
static STATE: Lazy<State> = Lazy::new(State::default);
/// After a matchmaker connection is established, it will open a bi-directional channel with the
/// client.
///
/// At this point the client is free to engage in the matchmaking protocol over that channel.
async fn impl_matchmaker(ep: iroh_net::Endpoint, conn: Connection) -> anyhow::Result<()> {
let connection_id = conn.stable_id();
loop {
// Get the next channel open or connection close event
tokio::select! {
close = conn.closed() => {
debug!("Connection closed {close:?}");
return Ok(());
}
bi = conn.accept_bi() => {
let (mut send, mut recv) = bi?;
// Parse matchmaker request
let request: MatchmakerRequest =
postcard::from_bytes(&recv.read_to_end(256).await?)?;
match request {
MatchmakerRequest::RequestMatch(match_info) => {
debug!(connection_id, ?match_info, "Got request for match");
// Accept request
let message = postcard::to_allocvec(&MatchmakerResponse::Accepted)?;
send.write_all(&message).await?;
send.finish().await?;
let player_count = match_info.client_count;
let mut members_to_join = Vec::new();
let mut members_to_notify = Vec::new();
// Make sure room exists
STATE
.rooms
.insert_async(match_info.clone(), Vec::new())
.await
.ok();
STATE
.rooms
.update_async(&match_info, |match_info, members| {
// Add the current client to the room
members.push(conn.clone());
// Spawn task to wait for connction to close and remove it from the room if it does
let conn = conn.clone();
let info = match_info.clone();
tokio::task::spawn(async move {
conn.closed().await;
let members = STATE
.rooms
.update_async(&info, |_, members| {
let mut was_removed = false;
members.retain(|x| {
if x.stable_id() != conn.stable_id() {
true
} else {
was_removed = true;
false
}
});
if was_removed {
Some(members.clone())
} else {
None
}
})
.await
.flatten();
if let Some(members) = members {
let result = async {
let message = postcard::to_allocvec(
&MatchmakerResponse::ClientCount(
members.len().try_into()?
),
)?;
for conn in members {
let mut send = conn.open_uni().await?;
send.write_all(&message).await?;
send.finish().await?;
}
Ok::<(), anyhow::Error>(())
};
result.await.ok();
}
});
let member_count = members.len();
// If we have a complete room
debug!(
?match_info,
"Room now has {}/{} members", member_count, player_count
);
if member_count >= player_count as _ {
// Clear the room
members_to_join.append(members);
} else {
members_to_notify = members.clone();
}
})
.await;
if !members_to_notify.is_empty() {
let message = postcard::to_allocvec(&MatchmakerResponse::ClientCount(
members_to_notify.len().try_into()?
))?;
for conn in members_to_notify {
let mut send = conn.open_uni().await?;
send.write_all(&message).await?;
send.finish().await?;
}
}
if !members_to_join.is_empty() {
// Send the match ID to all of the clients in the room
let mut player_ids = Vec::new();
let random_seed = rand::random();
for (idx, conn) in members_to_join.iter().enumerate() {
let id = get_remote_node_id(conn)?;
let mut addr = NodeAddr::new(id);
if let Some(info) = ep.connection_info(id) {
if let Some(relay_url) = info.relay_url {
addr = addr.with_relay_url(relay_url.relay_url);
}
addr = addr.with_direct_addresses(
info.addrs.into_iter().map(|addr| addr.addr),
);
}
player_ids.push((u32::try_from(idx)?, addr));
}
for (player_idx, conn) in members_to_join.into_iter().enumerate() {
// Respond with success
let message =
postcard::to_allocvec(&MatchmakerResponse::Success {
random_seed,
client_count: player_count,
player_idx: player_idx.try_into()?,
player_ids: player_ids.clone(),
})?;
let mut send = conn.open_uni().await?;
send.write_all(&message).await?;
send.finish().await?;
// Close connection, we are done here
conn.close(0u32.into(), b"done");
}
// cleanup
STATE.rooms.remove_async(&match_info).await;
}
}
}
}
}
}
}