mpc_websocket/
services.rs

1//! Services for handling JSON-RPC requests.
2//!
3//! Some methods send notifications to connected clients, see the
4//! method's documentation for more details.
5//!
6//! Notifications sent to connected clients are sent as a tuple
7//! of `String` event name followed by an arbitrary JSON `Value`
8//! payload for the event.
9//!
10//! ## Methods
11//!
12//! These are the JSON-RPC methods clients may call; some methods will broadcast events to connected clients, see the documentation for each method for more information.
13//!
14//! ### Group.create
15//!
16//! * `label`: Human-friendly `String` label for the group.
17//! * `parameters`: [Parameters](Parameters) for key generation and signing.
18//!
19//! Create a new group; the client that calls this method automatically joins the group.
20//!
21//! Returns the UUID for the group.
22//!
23//! ### Group.join
24//!
25//! * `group_id`: The `String` UUID for the group.
26//!
27//! Register the calling client as a member of the group.
28//!
29//! Returns the group object.
30//!
31//! ### Session.create
32//! * `group_id`: The `String` UUID for the group.
33//! * `kind`: The `String` kind of session (either `keygen` or `sign`).
34//!
35//! Create a new session.
36//!
37//! Returns the session object.
38//!
39//! ### Session.join
40//!
41//! * `group_id`: The `String` UUID for the group.
42//! * `session_id`: The `String` UUID for the session.
43//! * `kind`: The `String` kind of session (either `keygen` or `sign`).
44//!
45//! Join an existing session.
46//!
47//! Returns the session object.
48//!
49//! ### Session.signup
50//!
51//! * `group_id`: The `String` UUID for the group.
52//! * `session_id`: The `String` UUID for the session.
53//! * `kind`: The `String` kind of session (either `keygen` or `sign`).
54//!
55//! Register as a co-operating party for a session.
56//!
57//! When the required number of parties have signed up to a session a `sessionSignup` event is emitted to all the clients in the session. For key generation there must be `parties` clients in the session and for signing there must be `threshold + 1` clients registered for the session.
58//!
59//! Returns the party signup number.
60//!
61//! ### Session.load
62//!
63//! * `group_id`: The `String` UUID for the group.
64//! * `session_id`: The `String` UUID for the session.
65//! * `kind`: The `String` kind of session (must be `keygen`).
66//! * `number`: The `u16` party signup number.
67//!
68//! Load a client into a given slot (party signup number). This is used to allow the party signup numbers allocated to saved key shares to be assigned and validated in the context of a session.
69//!
70//! The given `number` must be in range and must be an available slot.
71//!
72//! When the required number of `parties` have been allocated to a session a `sessionLoad` event is emitted to all the clients in the session.
73//!
74//! Returns the party signup number.
75//!
76//! ### Session.participant
77//!
78//! * `group_id`: The `String` UUID for the group.
79//! * `session_id`: The `String` UUID for the session.
80//! * `index`: The `u16` party index.
81//! * `number`: The `u16` party signup number.
82//!
83//! Provide a mapping between receiver indices for a signing session so that the server can locate the connection identifier when relaying peer to peer messages.
84//!
85//! When signing clients must provide an array of the indices used during DKG, the party index is the index (plus one) of each client's local key index in that array. This is the value that the server sees as the receiver when relaying peer to peer messages but the server has no knowledge of this index so client's must register a mapping from the party index to the server-issued party number so that the correct connection id can be resolved.
86//!
87//! Returns an empty response to the caller.
88//!
89//! ### Session.message
90//!
91//! * `group_id`: The `String` UUID for the group.
92//! * `session_id`: The `String` UUID for the session.
93//! * `kind`: The `String` kind of session (either `keygen` or `sign`).
94//! * `message`: The message to broadcast or send peer to peer.
95//!
96//! Relay a message to all the other peers in the session (broadcast) or send directly to another peer.
97//!
98//! A `message` is treated as peer to peer when the `receiver` field is present which should be the party signup `number` for the peer for a keygen session or the party index for a signing session.
99//!
100//! This method is a notification and does not return anything to the caller.
101//!
102//! ### Session.finish
103//!
104//! * `group_id`: The `String` UUID for the group.
105//! * `session_id`: The `String` UUID for the session.
106//! * `number`: The `u16` party signup number.
107//!
108//! Indicate the session has been finished for the calling client.
109//!
110//! When all the clients in a session have called this method the server will emit a `sessionClosed` event to all the clients in the session.
111//!
112//! This method is a notification and does not return anything to the caller.
113//!
114use async_trait::async_trait;
115use json_rpc2::{futures::*, Error, Request, Response, Result, RpcError};
116use serde::{Deserialize, Serialize};
117use serde_json::Value;
118use std::collections::HashMap;
119use std::sync::Arc;
120use thiserror::Error;
121use tokio::sync::{Mutex, RwLock};
122use uuid::Uuid;
123
124use super::server::{
125    Group, Notification, Parameters, Session, SessionKind, State,
126};
127
128/// Error thrown by the JSON-RPC services.
129#[derive(Debug, Error)]
130pub enum ServiceError {
131    /// Error generated when a parties parameter is too small.
132    #[error("parties must be greater than one")]
133    PartiesTooSmall,
134    /// Error generated when a parties parameter is too small.
135    #[error("threshold must be greater than zero")]
136    ThresholdTooSmall,
137    /// Error generated when the threshold exceeds the parties.
138    #[error("threshold must be less than parties")]
139    ThresholdRange,
140    /// Error generated when a group has enough connections.
141    #[error("group {0} is full, cannot accept new connections")]
142    GroupFull(Uuid),
143    /// Error generated when a group does not exist.
144    #[error("group {0} does not exist")]
145    GroupDoesNotExist(Uuid),
146    /// Error generated when a session does not exist.
147    #[error("group {0} does not exist")]
148    SessionDoesNotExist(Uuid),
149    /// Error generated when a party number does not exist.
150    #[error("party {0} does not exist")]
151    PartyDoesNotExist(u16),
152    /// Error generated when a party number does not belong to the caller.
153    #[error("party {0} is not valid in this context")]
154    BadParty(u16),
155    /// Error generated when the receiver for a peer to peer message
156    /// does not exist.
157    #[error("receiver {0} for peer to peer message does not exist")]
158    BadPeerReceiver(u16),
159    /// Error generated when a client connection does not belong to
160    /// the specified group.
161    #[error("client {0} does not belong to the group {1}")]
162    BadConnection(usize, Uuid),
163}
164
165/// Error data indicating the connection should be closed.
166pub const CLOSE_CONNECTION: &str = "close-connection";
167
168/// Method to create a group.
169pub const GROUP_CREATE: &str = "Group.create";
170/// Method to join a group.
171pub const GROUP_JOIN: &str = "Group.join";
172/// Method to create a session.
173pub const SESSION_CREATE: &str = "Session.create";
174/// Method to join a session.
175pub const SESSION_JOIN: &str = "Session.join";
176/// Method to signup a session.
177pub const SESSION_SIGNUP: &str = "Session.signup";
178/// Method to load a party number into a session.
179pub const SESSION_LOAD: &str = "Session.load";
180/// Register a participant lookup for a signing session.
181pub const SESSION_PARTICIPANT: &str = "Session.participant";
182/// Method to broadcast or relay a message peer to peer.
183pub const SESSION_MESSAGE: &str = "Session.message";
184/// Method to indicate a session is finished.
185pub const SESSION_FINISH: &str = "Session.finish";
186
187/// Notification sent when a session has been created.
188///
189/// Used primarily during key generation so other connected
190/// clients can automatically join the session.
191pub const SESSION_CREATE_EVENT: &str = "sessionCreate";
192/// Notification sent when all expected parties have signed
193/// up to a session.
194pub const SESSION_SIGNUP_EVENT: &str = "sessionSignup";
195/// Notification sent when all parties have loaded a party signup
196/// number into a session.
197pub const SESSION_LOAD_EVENT: &str = "sessionLoad";
198/// Notification sent to clients with broadcast or peer to peer messages.
199pub const SESSION_MESSAGE_EVENT: &str = "sessionMessage";
200/// Notification sent when a session has been marked as finished
201/// by all participating clients.
202pub const SESSION_CLOSED_EVENT: &str = "sessionClosed";
203
204type GroupCreateParams = (String, Parameters);
205type SessionCreateParams = (Uuid, SessionKind, Option<Value>);
206type SessionJoinParams = (Uuid, Uuid, SessionKind);
207type SessionSignupParams = (Uuid, Uuid, SessionKind);
208type SessionLoadParams = (Uuid, Uuid, SessionKind, u16);
209type SessionParticipantParams = (Uuid, Uuid, u16, u16);
210type SessionMessageParams = (Uuid, Uuid, SessionKind, Message);
211type SessionFinishParams = (Uuid, Uuid, u16);
212
213// Mimics the `Msg` struct
214// from `round-based` but doesn't care
215// about the `body` data.
216#[derive(Serialize, Deserialize)]
217struct Message {
218    round: u16,
219    sender: u16,
220    receiver: Option<u16>,
221    uuid: String,
222    body: serde_json::Value,
223}
224
225#[derive(Debug, Serialize)]
226struct Proposal {
227    #[serde(rename = "sessionId")]
228    session_id: Uuid,
229    #[serde(rename = "proposalId")]
230    proposal_id: String,
231    message: String,
232}
233
234/// Service for replying to client requests.
235pub struct ServiceHandler;
236
237#[async_trait]
238impl Service for ServiceHandler {
239    type Data = (usize, Arc<RwLock<State>>, Arc<Mutex<Option<Notification>>>);
240
241    async fn handle(
242        &self,
243        req: &Request,
244        ctx: &Self::Data,
245    ) -> Result<Option<Response>> {
246        let response = match req.method() {
247            GROUP_CREATE => {
248                let (conn_id, state, _) = ctx;
249                let params: GroupCreateParams = req.deserialize()?;
250                let (label, parameters) = params;
251
252                // If parties is less than two then may as well
253                // use a standard single-party ECDSA private key
254                if parameters.parties <= 1 {
255                    return Err(Error::from(Box::from(
256                        ServiceError::PartiesTooSmall,
257                    )));
258                // If threshold is zero then it only
259                // takes a single party to sign a request which
260                // defeats the point of MPC
261                } else if parameters.threshold == 0 {
262                    return Err(Error::from(Box::from(
263                        ServiceError::ThresholdTooSmall,
264                    )));
265                // Threshold must be in range `(t + 1) <= n`
266                } else if parameters.threshold >= parameters.parties {
267                    return Err(Error::from(Box::from(
268                        ServiceError::ThresholdRange,
269                    )));
270                }
271
272                let group =
273                    Group::new(*conn_id, parameters.clone(), label.clone());
274                let res = serde_json::to_value(group.uuid).unwrap();
275                let mut writer = state.write().await;
276                writer.groups.insert(group.uuid, group);
277                Some((req, res).into())
278            }
279            GROUP_JOIN => {
280                let (conn_id, state, _) = ctx;
281                let group_id: Uuid = req.deserialize()?;
282                let mut writer = state.write().await;
283                if let Some(group) = writer.groups.get_mut(&group_id) {
284                    if group.clients.len() == group.params.parties as usize {
285                        let error = ServiceError::GroupFull(group_id);
286                        let err = RpcError::new(
287                            error.to_string(),
288                            Some(CLOSE_CONNECTION.to_string()),
289                        );
290                        Some((req, err).into())
291                    } else {
292                        if !group.clients.iter().any(|c| c == conn_id) {
293                            group.clients.push(*conn_id);
294                        }
295                        let res = serde_json::to_value(group).unwrap();
296                        Some((req, res).into())
297                    }
298                } else {
299                    return Err(Error::from(Box::from(
300                        ServiceError::GroupDoesNotExist(group_id),
301                    )));
302                }
303            }
304            SESSION_CREATE => {
305                let (conn_id, state, notification) = ctx;
306                let params: SessionCreateParams = req.deserialize()?;
307                let (group_id, kind, value) = params;
308                let mut writer = state.write().await;
309                let group =
310                    get_group_mut(conn_id, &group_id, &mut writer.groups)?;
311                let session = Session::from((kind.clone(), value));
312                let key = session.uuid;
313                group.sessions.insert(key, session.clone());
314
315                if let SessionKind::Keygen = kind {
316                    let value =
317                        serde_json::to_value((SESSION_CREATE_EVENT, &session))
318                            .unwrap();
319                    let response: Response = value.into();
320
321                    // Notify everyone else in the group a session was created
322                    let ctx = Notification::Group {
323                        group_id,
324                        filter: Some(vec![*conn_id]),
325                        response,
326                    };
327                    let mut writer = notification.lock().await;
328                    *writer = Some(ctx);
329                }
330
331                let res = serde_json::to_value(&session).unwrap();
332                Some((req, res).into())
333            }
334            SESSION_JOIN => {
335                let (conn_id, state, _) = ctx;
336                let params: SessionJoinParams = req.deserialize()?;
337                let (group_id, session_id, _kind) = params;
338
339                let mut writer = state.write().await;
340                let group =
341                    get_group_mut(conn_id, &group_id, &mut writer.groups)?;
342                if let Some(session) = group.sessions.get_mut(&session_id) {
343                    let res = serde_json::to_value(&session).unwrap();
344                    Some((req, res).into())
345                } else {
346                    return Err(Error::from(Box::from(
347                        ServiceError::SessionDoesNotExist(session_id),
348                    )));
349                }
350            }
351            SESSION_SIGNUP => {
352                let (conn_id, state, notification) = ctx;
353                let params: SessionSignupParams = req.deserialize()?;
354                let (group_id, session_id, kind) = params;
355
356                let mut writer = state.write().await;
357                let group =
358                    get_group_mut(conn_id, &group_id, &mut writer.groups)?;
359                if let Some(session) = group.sessions.get_mut(&session_id) {
360                    let party_number = session.signup(*conn_id);
361
362                    tracing::info!(party_number, "session signup {}", conn_id);
363
364                    // Enough parties are signed up to the session
365                    if threshold(
366                        &kind,
367                        &group.params,
368                        session.party_signups.len(),
369                    ) {
370                        let value = serde_json::to_value((
371                            SESSION_SIGNUP_EVENT,
372                            &session_id,
373                        ))
374                        .unwrap();
375                        let response: Response = value.into();
376                        let ctx = Notification::Session {
377                            group_id,
378                            session_id,
379                            filter: None,
380                            response,
381                        };
382
383                        let mut writer = notification.lock().await;
384                        *writer = Some(ctx);
385                    }
386
387                    let res = serde_json::to_value(party_number).unwrap();
388                    Some((req, res).into())
389                } else {
390                    return Err(Error::from(Box::from(
391                        ServiceError::SessionDoesNotExist(session_id),
392                    )));
393                }
394            }
395            // Load an existing party signup into the session
396            // this is used to support loading existing key shares.
397            SESSION_LOAD => {
398                let (conn_id, state, notification) = ctx;
399                let params: SessionLoadParams = req.deserialize()?;
400                let (group_id, session_id, kind, party_number) = params;
401
402                let mut writer = state.write().await;
403                let group =
404                    get_group_mut(conn_id, &group_id, &mut writer.groups)?;
405                if let Some(session) = group.sessions.get_mut(&session_id) {
406                    let res = serde_json::to_value(party_number).unwrap();
407                    match session.load(&group.params, *conn_id, party_number) {
408                        Ok(_) => {
409                            // Enough parties are loaded into the session
410                            if threshold(
411                                &kind,
412                                &group.params,
413                                session.party_signups.len(),
414                            ) {
415                                let value = serde_json::to_value((
416                                    SESSION_LOAD_EVENT,
417                                    &session_id,
418                                ))
419                                .unwrap();
420                                let response: Response = value.into();
421                                let ctx = Notification::Session {
422                                    group_id,
423                                    session_id,
424                                    filter: None,
425                                    response,
426                                };
427                                let mut writer = notification.lock().await;
428                                *writer = Some(ctx);
429                            }
430
431                            Some((req, res).into())
432                        }
433                        Err(err) => return Err(Error::from(Box::from(err))),
434                    }
435                } else {
436                    return Err(Error::from(Box::from(
437                        ServiceError::SessionDoesNotExist(session_id),
438                    )));
439                }
440            }
441            // Register a participant lookup for a signing session.
442            //
443            // This allows clients to map the receiver party index to a
444            // server issued party number which in turn allows the server
445            // to correctly resolve the connection identifier that we need
446            // to relay for peer to peer rounds.
447            SESSION_PARTICIPANT => {
448                let (conn_id, state, _) = ctx;
449                let params: SessionParticipantParams = req.deserialize()?;
450                let (group_id, session_id, party_index, party_number) = params;
451
452                let mut writer = state.write().await;
453                let group =
454                    get_group_mut(conn_id, &group_id, &mut writer.groups)?;
455                if let Some(session) = group.sessions.get_mut(&session_id) {
456                    session.participants.insert(party_index, party_number);
457                    Some(req.into())
458                } else {
459                    return Err(Error::from(Box::from(
460                        ServiceError::SessionDoesNotExist(session_id),
461                    )));
462                }
463            }
464            // Mark the session as finished for a party.
465            SESSION_FINISH => {
466                let (conn_id, state, notification) = ctx;
467                let params: SessionFinishParams = req.deserialize()?;
468                let (group_id, session_id, party_number) = params;
469
470                let mut writer = state.write().await;
471                let group =
472                    get_group_mut(conn_id, &group_id, &mut writer.groups)?;
473                if let Some(session) = group.sessions.get_mut(&session_id) {
474                    let existing_signup = session
475                        .party_signups
476                        .iter()
477                        .find(|(s, _)| s == &party_number);
478
479                    if let Some((_, conn)) = existing_signup {
480                        // The party number must belong to the caller
481                        // which we check by comparing connection identifiers
482                        if conn != conn_id {
483                            return Err(Error::from(Box::from(
484                                ServiceError::BadParty(party_number),
485                            )));
486                        }
487
488                        session.finished.insert(party_number);
489
490                        let mut signups = session
491                            .party_signups
492                            .iter()
493                            .map(|(n, _)| *n)
494                            .collect::<Vec<u16>>();
495                        let mut completed = session
496                            .finished
497                            .iter()
498                            .cloned()
499                            .collect::<Vec<u16>>();
500
501                        signups.sort();
502                        completed.sort();
503
504                        if signups == completed {
505                            let value = serde_json::to_value((
506                                SESSION_CLOSED_EVENT,
507                                completed,
508                            ))
509                            .unwrap();
510                            let response: Response = value.into();
511
512                            let ctx = Notification::Session {
513                                group_id,
514                                session_id,
515                                filter: None,
516                                response,
517                            };
518
519                            let mut writer = notification.lock().await;
520                            *writer = Some(ctx);
521                        }
522
523                        Some(req.into())
524                    } else {
525                        return Err(Error::from(Box::from(
526                            ServiceError::PartyDoesNotExist(party_number),
527                        )));
528                    }
529                } else {
530                    return Err(Error::from(Box::from(
531                        ServiceError::SessionDoesNotExist(session_id),
532                    )));
533                }
534            }
535            SESSION_MESSAGE => {
536                let (conn_id, state, notification) = ctx;
537                let params: SessionMessageParams = req.deserialize()?;
538                let (group_id, session_id, _kind, msg) = params;
539
540                let reader = state.read().await;
541
542                // Check we have valid group / session
543                let (_group, session) = get_group_session(
544                    conn_id,
545                    &group_id,
546                    &session_id,
547                    &reader.groups,
548                )?;
549
550                // Send direct to peer
551                if let Some(receiver) = &msg.receiver {
552                    if let Some(s) = session.resolve(*receiver) {
553                        let value =
554                            serde_json::to_value((SESSION_MESSAGE_EVENT, msg))
555                                .unwrap();
556
557                        let response: Response = value.into();
558                        let message = (s.1, response);
559
560                        let ctx = Notification::Relay {
561                            messages: vec![message],
562                        };
563
564                        let mut writer = notification.lock().await;
565                        *writer = Some(ctx);
566                    } else {
567                        return Err(Error::from(Box::from(
568                            ServiceError::BadPeerReceiver(*receiver),
569                        )));
570                    }
571                // Handle broadcast round
572                } else {
573                    let value =
574                        serde_json::to_value((SESSION_MESSAGE_EVENT, msg))
575                            .unwrap();
576                    let response: Response = value.clone().into();
577
578                    let ctx = Notification::Session {
579                        group_id,
580                        session_id,
581                        filter: Some(vec![*conn_id]),
582                        response,
583                    };
584
585                    let mut writer = notification.lock().await;
586                    *writer = Some(ctx);
587                }
588
589                // Must ACK so we indicate the service method exists
590                Some(req.into())
591            }
592            _ => None,
593        };
594        Ok(response)
595    }
596}
597
598fn get_group_mut<'a>(
599    conn_id: &usize,
600    group_id: &Uuid,
601    groups: &'a mut HashMap<Uuid, Group>,
602) -> Result<&'a mut Group> {
603    if let Some(group) = groups.get_mut(group_id) {
604        // Verify connection is part of the group clients
605        if group.clients.iter().any(|c| c == conn_id) {
606            Ok(group)
607        } else {
608            Err(Error::from(Box::from(ServiceError::BadConnection(
609                *conn_id, *group_id,
610            ))))
611        }
612    } else {
613        Err(Error::from(Box::from(ServiceError::GroupDoesNotExist(
614            *group_id,
615        ))))
616    }
617}
618
619fn get_group<'a>(
620    conn_id: &usize,
621    group_id: &Uuid,
622    groups: &'a HashMap<Uuid, Group>,
623) -> Result<&'a Group> {
624    if let Some(group) = groups.get(group_id) {
625        // Verify connection is part of the group clients
626        if group.clients.iter().any(|c| c == conn_id) {
627            Ok(group)
628        } else {
629            Err(Error::from(Box::from(ServiceError::BadConnection(
630                *conn_id, *group_id,
631            ))))
632        }
633    } else {
634        Err(Error::from(Box::from(ServiceError::GroupDoesNotExist(
635            *group_id,
636        ))))
637    }
638}
639
640fn get_group_session<'a>(
641    conn_id: &usize,
642    group_id: &Uuid,
643    session_id: &Uuid,
644    groups: &'a HashMap<Uuid, Group>,
645) -> Result<(&'a Group, &'a Session)> {
646    let group = get_group(conn_id, group_id, groups)?;
647    if let Some(session) = group.sessions.get(session_id) {
648        Ok((group, session))
649    } else {
650        Err(Error::from(Box::from(ServiceError::SessionDoesNotExist(
651            *session_id,
652        ))))
653    }
654}
655
656/// Helper to determine if we met a session party threshold.
657fn threshold(
658    kind: &SessionKind,
659    params: &Parameters,
660    num_entries: usize,
661) -> bool {
662    let parties = params.parties as usize;
663    let threshold = params.threshold as usize;
664    let required_num_entries = match kind {
665        SessionKind::Keygen => parties,
666        SessionKind::Sign => threshold + 1,
667    };
668    num_entries == required_num_entries
669}