mpc_websocket/services.rs
1//! Services for handling JSON-RPC requests.
2//!
3//! Some methods send notifications to connected clients, see the
4//! method's documentation for more details.
5//!
6//! Notifications sent to connected clients are sent as a tuple
7//! of `String` event name followed by an arbitrary JSON `Value`
8//! payload for the event.
9//!
10//! ## Methods
11//!
12//! These are the JSON-RPC methods clients may call; some methods will broadcast events to connected clients, see the documentation for each method for more information.
13//!
14//! ### Group.create
15//!
16//! * `label`: Human-friendly `String` label for the group.
17//! * `parameters`: [Parameters](Parameters) for key generation and signing.
18//!
19//! Create a new group; the client that calls this method automatically joins the group.
20//!
21//! Returns the UUID for the group.
22//!
23//! ### Group.join
24//!
25//! * `group_id`: The `String` UUID for the group.
26//!
27//! Register the calling client as a member of the group.
28//!
29//! Returns the group object.
30//!
31//! ### Session.create
32//! * `group_id`: The `String` UUID for the group.
33//! * `kind`: The `String` kind of session (either `keygen` or `sign`).
34//!
35//! Create a new session.
36//!
37//! Returns the session object.
38//!
39//! ### Session.join
40//!
41//! * `group_id`: The `String` UUID for the group.
42//! * `session_id`: The `String` UUID for the session.
43//! * `kind`: The `String` kind of session (either `keygen` or `sign`).
44//!
45//! Join an existing session.
46//!
47//! Returns the session object.
48//!
49//! ### Session.signup
50//!
51//! * `group_id`: The `String` UUID for the group.
52//! * `session_id`: The `String` UUID for the session.
53//! * `kind`: The `String` kind of session (either `keygen` or `sign`).
54//!
55//! Register as a co-operating party for a session.
56//!
57//! When the required number of parties have signed up to a session a `sessionSignup` event is emitted to all the clients in the session. For key generation there must be `parties` clients in the session and for signing there must be `threshold + 1` clients registered for the session.
58//!
59//! Returns the party signup number.
60//!
61//! ### Session.load
62//!
63//! * `group_id`: The `String` UUID for the group.
64//! * `session_id`: The `String` UUID for the session.
65//! * `kind`: The `String` kind of session (must be `keygen`).
66//! * `number`: The `u16` party signup number.
67//!
68//! Load a client into a given slot (party signup number). This is used to allow the party signup numbers allocated to saved key shares to be assigned and validated in the context of a session.
69//!
70//! The given `number` must be in range and must be an available slot.
71//!
72//! When the required number of `parties` have been allocated to a session a `sessionLoad` event is emitted to all the clients in the session.
73//!
74//! Returns the party signup number.
75//!
76//! ### Session.participant
77//!
78//! * `group_id`: The `String` UUID for the group.
79//! * `session_id`: The `String` UUID for the session.
80//! * `index`: The `u16` party index.
81//! * `number`: The `u16` party signup number.
82//!
83//! Provide a mapping between receiver indices for a signing session so that the server can locate the connection identifier when relaying peer to peer messages.
84//!
85//! When signing clients must provide an array of the indices used during DKG, the party index is the index (plus one) of each client's local key index in that array. This is the value that the server sees as the receiver when relaying peer to peer messages but the server has no knowledge of this index so client's must register a mapping from the party index to the server-issued party number so that the correct connection id can be resolved.
86//!
87//! Returns an empty response to the caller.
88//!
89//! ### Session.message
90//!
91//! * `group_id`: The `String` UUID for the group.
92//! * `session_id`: The `String` UUID for the session.
93//! * `kind`: The `String` kind of session (either `keygen` or `sign`).
94//! * `message`: The message to broadcast or send peer to peer.
95//!
96//! Relay a message to all the other peers in the session (broadcast) or send directly to another peer.
97//!
98//! A `message` is treated as peer to peer when the `receiver` field is present which should be the party signup `number` for the peer for a keygen session or the party index for a signing session.
99//!
100//! This method is a notification and does not return anything to the caller.
101//!
102//! ### Session.finish
103//!
104//! * `group_id`: The `String` UUID for the group.
105//! * `session_id`: The `String` UUID for the session.
106//! * `number`: The `u16` party signup number.
107//!
108//! Indicate the session has been finished for the calling client.
109//!
110//! When all the clients in a session have called this method the server will emit a `sessionClosed` event to all the clients in the session.
111//!
112//! This method is a notification and does not return anything to the caller.
113//!
114use async_trait::async_trait;
115use json_rpc2::{futures::*, Error, Request, Response, Result, RpcError};
116use serde::{Deserialize, Serialize};
117use serde_json::Value;
118use std::collections::HashMap;
119use std::sync::Arc;
120use thiserror::Error;
121use tokio::sync::{Mutex, RwLock};
122use uuid::Uuid;
123
124use super::server::{
125 Group, Notification, Parameters, Session, SessionKind, State,
126};
127
128/// Error thrown by the JSON-RPC services.
129#[derive(Debug, Error)]
130pub enum ServiceError {
131 /// Error generated when a parties parameter is too small.
132 #[error("parties must be greater than one")]
133 PartiesTooSmall,
134 /// Error generated when a parties parameter is too small.
135 #[error("threshold must be greater than zero")]
136 ThresholdTooSmall,
137 /// Error generated when the threshold exceeds the parties.
138 #[error("threshold must be less than parties")]
139 ThresholdRange,
140 /// Error generated when a group has enough connections.
141 #[error("group {0} is full, cannot accept new connections")]
142 GroupFull(Uuid),
143 /// Error generated when a group does not exist.
144 #[error("group {0} does not exist")]
145 GroupDoesNotExist(Uuid),
146 /// Error generated when a session does not exist.
147 #[error("group {0} does not exist")]
148 SessionDoesNotExist(Uuid),
149 /// Error generated when a party number does not exist.
150 #[error("party {0} does not exist")]
151 PartyDoesNotExist(u16),
152 /// Error generated when a party number does not belong to the caller.
153 #[error("party {0} is not valid in this context")]
154 BadParty(u16),
155 /// Error generated when the receiver for a peer to peer message
156 /// does not exist.
157 #[error("receiver {0} for peer to peer message does not exist")]
158 BadPeerReceiver(u16),
159 /// Error generated when a client connection does not belong to
160 /// the specified group.
161 #[error("client {0} does not belong to the group {1}")]
162 BadConnection(usize, Uuid),
163}
164
165/// Error data indicating the connection should be closed.
166pub const CLOSE_CONNECTION: &str = "close-connection";
167
168/// Method to create a group.
169pub const GROUP_CREATE: &str = "Group.create";
170/// Method to join a group.
171pub const GROUP_JOIN: &str = "Group.join";
172/// Method to create a session.
173pub const SESSION_CREATE: &str = "Session.create";
174/// Method to join a session.
175pub const SESSION_JOIN: &str = "Session.join";
176/// Method to signup a session.
177pub const SESSION_SIGNUP: &str = "Session.signup";
178/// Method to load a party number into a session.
179pub const SESSION_LOAD: &str = "Session.load";
180/// Register a participant lookup for a signing session.
181pub const SESSION_PARTICIPANT: &str = "Session.participant";
182/// Method to broadcast or relay a message peer to peer.
183pub const SESSION_MESSAGE: &str = "Session.message";
184/// Method to indicate a session is finished.
185pub const SESSION_FINISH: &str = "Session.finish";
186
187/// Notification sent when a session has been created.
188///
189/// Used primarily during key generation so other connected
190/// clients can automatically join the session.
191pub const SESSION_CREATE_EVENT: &str = "sessionCreate";
192/// Notification sent when all expected parties have signed
193/// up to a session.
194pub const SESSION_SIGNUP_EVENT: &str = "sessionSignup";
195/// Notification sent when all parties have loaded a party signup
196/// number into a session.
197pub const SESSION_LOAD_EVENT: &str = "sessionLoad";
198/// Notification sent to clients with broadcast or peer to peer messages.
199pub const SESSION_MESSAGE_EVENT: &str = "sessionMessage";
200/// Notification sent when a session has been marked as finished
201/// by all participating clients.
202pub const SESSION_CLOSED_EVENT: &str = "sessionClosed";
203
204type GroupCreateParams = (String, Parameters);
205type SessionCreateParams = (Uuid, SessionKind, Option<Value>);
206type SessionJoinParams = (Uuid, Uuid, SessionKind);
207type SessionSignupParams = (Uuid, Uuid, SessionKind);
208type SessionLoadParams = (Uuid, Uuid, SessionKind, u16);
209type SessionParticipantParams = (Uuid, Uuid, u16, u16);
210type SessionMessageParams = (Uuid, Uuid, SessionKind, Message);
211type SessionFinishParams = (Uuid, Uuid, u16);
212
213// Mimics the `Msg` struct
214// from `round-based` but doesn't care
215// about the `body` data.
216#[derive(Serialize, Deserialize)]
217struct Message {
218 round: u16,
219 sender: u16,
220 receiver: Option<u16>,
221 uuid: String,
222 body: serde_json::Value,
223}
224
225#[derive(Debug, Serialize)]
226struct Proposal {
227 #[serde(rename = "sessionId")]
228 session_id: Uuid,
229 #[serde(rename = "proposalId")]
230 proposal_id: String,
231 message: String,
232}
233
234/// Service for replying to client requests.
235pub struct ServiceHandler;
236
237#[async_trait]
238impl Service for ServiceHandler {
239 type Data = (usize, Arc<RwLock<State>>, Arc<Mutex<Option<Notification>>>);
240
241 async fn handle(
242 &self,
243 req: &Request,
244 ctx: &Self::Data,
245 ) -> Result<Option<Response>> {
246 let response = match req.method() {
247 GROUP_CREATE => {
248 let (conn_id, state, _) = ctx;
249 let params: GroupCreateParams = req.deserialize()?;
250 let (label, parameters) = params;
251
252 // If parties is less than two then may as well
253 // use a standard single-party ECDSA private key
254 if parameters.parties <= 1 {
255 return Err(Error::from(Box::from(
256 ServiceError::PartiesTooSmall,
257 )));
258 // If threshold is zero then it only
259 // takes a single party to sign a request which
260 // defeats the point of MPC
261 } else if parameters.threshold == 0 {
262 return Err(Error::from(Box::from(
263 ServiceError::ThresholdTooSmall,
264 )));
265 // Threshold must be in range `(t + 1) <= n`
266 } else if parameters.threshold >= parameters.parties {
267 return Err(Error::from(Box::from(
268 ServiceError::ThresholdRange,
269 )));
270 }
271
272 let group =
273 Group::new(*conn_id, parameters.clone(), label.clone());
274 let res = serde_json::to_value(group.uuid).unwrap();
275 let mut writer = state.write().await;
276 writer.groups.insert(group.uuid, group);
277 Some((req, res).into())
278 }
279 GROUP_JOIN => {
280 let (conn_id, state, _) = ctx;
281 let group_id: Uuid = req.deserialize()?;
282 let mut writer = state.write().await;
283 if let Some(group) = writer.groups.get_mut(&group_id) {
284 if group.clients.len() == group.params.parties as usize {
285 let error = ServiceError::GroupFull(group_id);
286 let err = RpcError::new(
287 error.to_string(),
288 Some(CLOSE_CONNECTION.to_string()),
289 );
290 Some((req, err).into())
291 } else {
292 if !group.clients.iter().any(|c| c == conn_id) {
293 group.clients.push(*conn_id);
294 }
295 let res = serde_json::to_value(group).unwrap();
296 Some((req, res).into())
297 }
298 } else {
299 return Err(Error::from(Box::from(
300 ServiceError::GroupDoesNotExist(group_id),
301 )));
302 }
303 }
304 SESSION_CREATE => {
305 let (conn_id, state, notification) = ctx;
306 let params: SessionCreateParams = req.deserialize()?;
307 let (group_id, kind, value) = params;
308 let mut writer = state.write().await;
309 let group =
310 get_group_mut(conn_id, &group_id, &mut writer.groups)?;
311 let session = Session::from((kind.clone(), value));
312 let key = session.uuid;
313 group.sessions.insert(key, session.clone());
314
315 if let SessionKind::Keygen = kind {
316 let value =
317 serde_json::to_value((SESSION_CREATE_EVENT, &session))
318 .unwrap();
319 let response: Response = value.into();
320
321 // Notify everyone else in the group a session was created
322 let ctx = Notification::Group {
323 group_id,
324 filter: Some(vec![*conn_id]),
325 response,
326 };
327 let mut writer = notification.lock().await;
328 *writer = Some(ctx);
329 }
330
331 let res = serde_json::to_value(&session).unwrap();
332 Some((req, res).into())
333 }
334 SESSION_JOIN => {
335 let (conn_id, state, _) = ctx;
336 let params: SessionJoinParams = req.deserialize()?;
337 let (group_id, session_id, _kind) = params;
338
339 let mut writer = state.write().await;
340 let group =
341 get_group_mut(conn_id, &group_id, &mut writer.groups)?;
342 if let Some(session) = group.sessions.get_mut(&session_id) {
343 let res = serde_json::to_value(&session).unwrap();
344 Some((req, res).into())
345 } else {
346 return Err(Error::from(Box::from(
347 ServiceError::SessionDoesNotExist(session_id),
348 )));
349 }
350 }
351 SESSION_SIGNUP => {
352 let (conn_id, state, notification) = ctx;
353 let params: SessionSignupParams = req.deserialize()?;
354 let (group_id, session_id, kind) = params;
355
356 let mut writer = state.write().await;
357 let group =
358 get_group_mut(conn_id, &group_id, &mut writer.groups)?;
359 if let Some(session) = group.sessions.get_mut(&session_id) {
360 let party_number = session.signup(*conn_id);
361
362 tracing::info!(party_number, "session signup {}", conn_id);
363
364 // Enough parties are signed up to the session
365 if threshold(
366 &kind,
367 &group.params,
368 session.party_signups.len(),
369 ) {
370 let value = serde_json::to_value((
371 SESSION_SIGNUP_EVENT,
372 &session_id,
373 ))
374 .unwrap();
375 let response: Response = value.into();
376 let ctx = Notification::Session {
377 group_id,
378 session_id,
379 filter: None,
380 response,
381 };
382
383 let mut writer = notification.lock().await;
384 *writer = Some(ctx);
385 }
386
387 let res = serde_json::to_value(party_number).unwrap();
388 Some((req, res).into())
389 } else {
390 return Err(Error::from(Box::from(
391 ServiceError::SessionDoesNotExist(session_id),
392 )));
393 }
394 }
395 // Load an existing party signup into the session
396 // this is used to support loading existing key shares.
397 SESSION_LOAD => {
398 let (conn_id, state, notification) = ctx;
399 let params: SessionLoadParams = req.deserialize()?;
400 let (group_id, session_id, kind, party_number) = params;
401
402 let mut writer = state.write().await;
403 let group =
404 get_group_mut(conn_id, &group_id, &mut writer.groups)?;
405 if let Some(session) = group.sessions.get_mut(&session_id) {
406 let res = serde_json::to_value(party_number).unwrap();
407 match session.load(&group.params, *conn_id, party_number) {
408 Ok(_) => {
409 // Enough parties are loaded into the session
410 if threshold(
411 &kind,
412 &group.params,
413 session.party_signups.len(),
414 ) {
415 let value = serde_json::to_value((
416 SESSION_LOAD_EVENT,
417 &session_id,
418 ))
419 .unwrap();
420 let response: Response = value.into();
421 let ctx = Notification::Session {
422 group_id,
423 session_id,
424 filter: None,
425 response,
426 };
427 let mut writer = notification.lock().await;
428 *writer = Some(ctx);
429 }
430
431 Some((req, res).into())
432 }
433 Err(err) => return Err(Error::from(Box::from(err))),
434 }
435 } else {
436 return Err(Error::from(Box::from(
437 ServiceError::SessionDoesNotExist(session_id),
438 )));
439 }
440 }
441 // Register a participant lookup for a signing session.
442 //
443 // This allows clients to map the receiver party index to a
444 // server issued party number which in turn allows the server
445 // to correctly resolve the connection identifier that we need
446 // to relay for peer to peer rounds.
447 SESSION_PARTICIPANT => {
448 let (conn_id, state, _) = ctx;
449 let params: SessionParticipantParams = req.deserialize()?;
450 let (group_id, session_id, party_index, party_number) = params;
451
452 let mut writer = state.write().await;
453 let group =
454 get_group_mut(conn_id, &group_id, &mut writer.groups)?;
455 if let Some(session) = group.sessions.get_mut(&session_id) {
456 session.participants.insert(party_index, party_number);
457 Some(req.into())
458 } else {
459 return Err(Error::from(Box::from(
460 ServiceError::SessionDoesNotExist(session_id),
461 )));
462 }
463 }
464 // Mark the session as finished for a party.
465 SESSION_FINISH => {
466 let (conn_id, state, notification) = ctx;
467 let params: SessionFinishParams = req.deserialize()?;
468 let (group_id, session_id, party_number) = params;
469
470 let mut writer = state.write().await;
471 let group =
472 get_group_mut(conn_id, &group_id, &mut writer.groups)?;
473 if let Some(session) = group.sessions.get_mut(&session_id) {
474 let existing_signup = session
475 .party_signups
476 .iter()
477 .find(|(s, _)| s == &party_number);
478
479 if let Some((_, conn)) = existing_signup {
480 // The party number must belong to the caller
481 // which we check by comparing connection identifiers
482 if conn != conn_id {
483 return Err(Error::from(Box::from(
484 ServiceError::BadParty(party_number),
485 )));
486 }
487
488 session.finished.insert(party_number);
489
490 let mut signups = session
491 .party_signups
492 .iter()
493 .map(|(n, _)| *n)
494 .collect::<Vec<u16>>();
495 let mut completed = session
496 .finished
497 .iter()
498 .cloned()
499 .collect::<Vec<u16>>();
500
501 signups.sort();
502 completed.sort();
503
504 if signups == completed {
505 let value = serde_json::to_value((
506 SESSION_CLOSED_EVENT,
507 completed,
508 ))
509 .unwrap();
510 let response: Response = value.into();
511
512 let ctx = Notification::Session {
513 group_id,
514 session_id,
515 filter: None,
516 response,
517 };
518
519 let mut writer = notification.lock().await;
520 *writer = Some(ctx);
521 }
522
523 Some(req.into())
524 } else {
525 return Err(Error::from(Box::from(
526 ServiceError::PartyDoesNotExist(party_number),
527 )));
528 }
529 } else {
530 return Err(Error::from(Box::from(
531 ServiceError::SessionDoesNotExist(session_id),
532 )));
533 }
534 }
535 SESSION_MESSAGE => {
536 let (conn_id, state, notification) = ctx;
537 let params: SessionMessageParams = req.deserialize()?;
538 let (group_id, session_id, _kind, msg) = params;
539
540 let reader = state.read().await;
541
542 // Check we have valid group / session
543 let (_group, session) = get_group_session(
544 conn_id,
545 &group_id,
546 &session_id,
547 &reader.groups,
548 )?;
549
550 // Send direct to peer
551 if let Some(receiver) = &msg.receiver {
552 if let Some(s) = session.resolve(*receiver) {
553 let value =
554 serde_json::to_value((SESSION_MESSAGE_EVENT, msg))
555 .unwrap();
556
557 let response: Response = value.into();
558 let message = (s.1, response);
559
560 let ctx = Notification::Relay {
561 messages: vec![message],
562 };
563
564 let mut writer = notification.lock().await;
565 *writer = Some(ctx);
566 } else {
567 return Err(Error::from(Box::from(
568 ServiceError::BadPeerReceiver(*receiver),
569 )));
570 }
571 // Handle broadcast round
572 } else {
573 let value =
574 serde_json::to_value((SESSION_MESSAGE_EVENT, msg))
575 .unwrap();
576 let response: Response = value.clone().into();
577
578 let ctx = Notification::Session {
579 group_id,
580 session_id,
581 filter: Some(vec![*conn_id]),
582 response,
583 };
584
585 let mut writer = notification.lock().await;
586 *writer = Some(ctx);
587 }
588
589 // Must ACK so we indicate the service method exists
590 Some(req.into())
591 }
592 _ => None,
593 };
594 Ok(response)
595 }
596}
597
598fn get_group_mut<'a>(
599 conn_id: &usize,
600 group_id: &Uuid,
601 groups: &'a mut HashMap<Uuid, Group>,
602) -> Result<&'a mut Group> {
603 if let Some(group) = groups.get_mut(group_id) {
604 // Verify connection is part of the group clients
605 if group.clients.iter().any(|c| c == conn_id) {
606 Ok(group)
607 } else {
608 Err(Error::from(Box::from(ServiceError::BadConnection(
609 *conn_id, *group_id,
610 ))))
611 }
612 } else {
613 Err(Error::from(Box::from(ServiceError::GroupDoesNotExist(
614 *group_id,
615 ))))
616 }
617}
618
619fn get_group<'a>(
620 conn_id: &usize,
621 group_id: &Uuid,
622 groups: &'a HashMap<Uuid, Group>,
623) -> Result<&'a Group> {
624 if let Some(group) = groups.get(group_id) {
625 // Verify connection is part of the group clients
626 if group.clients.iter().any(|c| c == conn_id) {
627 Ok(group)
628 } else {
629 Err(Error::from(Box::from(ServiceError::BadConnection(
630 *conn_id, *group_id,
631 ))))
632 }
633 } else {
634 Err(Error::from(Box::from(ServiceError::GroupDoesNotExist(
635 *group_id,
636 ))))
637 }
638}
639
640fn get_group_session<'a>(
641 conn_id: &usize,
642 group_id: &Uuid,
643 session_id: &Uuid,
644 groups: &'a HashMap<Uuid, Group>,
645) -> Result<(&'a Group, &'a Session)> {
646 let group = get_group(conn_id, group_id, groups)?;
647 if let Some(session) = group.sessions.get(session_id) {
648 Ok((group, session))
649 } else {
650 Err(Error::from(Box::from(ServiceError::SessionDoesNotExist(
651 *session_id,
652 ))))
653 }
654}
655
656/// Helper to determine if we met a session party threshold.
657fn threshold(
658 kind: &SessionKind,
659 params: &Parameters,
660 num_entries: usize,
661) -> bool {
662 let parties = params.parties as usize;
663 let threshold = params.threshold as usize;
664 let required_num_entries = match kind {
665 SessionKind::Keygen => parties,
666 SessionKind::Sign => threshold + 1,
667 };
668 num_entries == required_num_entries
669}