yrs/sync/
protocol.rs

1use crate::encoding::read;
2use crate::encoding::read::Cursor;
3use crate::error::UpdateError;
4use crate::sync::{awareness, Awareness, AwarenessUpdate};
5use crate::updates::decoder::{Decode, Decoder, DecoderV1};
6use crate::updates::encoder::{Encode, Encoder};
7use crate::{ReadTxn, StateVector, Update};
8use async_trait::async_trait;
9use smallvec::{smallvec, SmallVec};
10use thiserror::Error;
11/*
12 Core Yjs defines two message types:
13 • YjsSyncStep1: Includes the State Set of the sending client. When received, the client should reply with YjsSyncStep2.
14 • YjsSyncStep2: Includes all missing structs and the complete delete set. When received, the client is assured that it
15   received all information from the remote client.
16
17 In a peer-to-peer network, you may want to introduce a SyncDone message type. Both parties should initiate the connection
18 with SyncStep1. When a client received SyncStep2, it should reply with SyncDone. When the local client received both
19 SyncStep2 and SyncDone, it is assured that it is synced to the remote client.
20
21 In a client-server model, you want to handle this differently: The client should initiate the connection with SyncStep1.
22 When the server receives SyncStep1, it should reply with SyncStep2 immediately followed by SyncStep1. The client replies
23 with SyncStep2 when it receives SyncStep1. Optionally the server may send a SyncDone after it received SyncStep2, so the
24 client knows that the sync is finished.  There are two reasons for this more elaborated sync model: 1. This protocol can
25 easily be implemented on top of http and websockets. 2. The server should only reply to requests, and not initiate them.
26 Therefore, it is necessary that the client initiates the sync.
27
28 Construction of a message:
29 [messageType : varUint, message definition..]
30
31 Note: A message does not include information about the room name. This must be handled by the upper layer protocol!
32
33 stringify[messageType] stringifies a message definition (messageType is already read from the buffer)
34*/
35
36/// A default implementation of y-sync [Protocol].
37#[derive(Debug, Copy, Clone, Default)]
38pub struct DefaultProtocol;
39
40impl Protocol for DefaultProtocol {}
41
42#[cfg_attr(not(feature = "sync"), async_trait(?Send))]
43#[cfg_attr(feature = "sync", async_trait)]
44impl AsyncProtocol for DefaultProtocol {}
45
46/// Trait implementing a y-sync protocol. The default implementation can be found in
47/// [DefaultProtocol], but its implementation steps can be potentially changed by the user if
48/// necessary.
49pub trait Protocol {
50    /// To be called whenever a new connection has been accepted. Returns an encoded list of
51    /// messages to be sent back to initiator. This binary may contain multiple messages inside,
52    /// stored one after another.
53    fn start<E>(&self, awareness: &Awareness, encoder: &mut E) -> Result<(), Error>
54    where
55        E: Encoder,
56    {
57        use crate::Transact;
58        let (sv, update) = {
59            let sv = awareness.doc().transact().state_vector();
60            let update = awareness.update()?;
61            (sv, update)
62        };
63        Message::Sync(SyncMessage::SyncStep1(sv)).encode(encoder);
64        Message::Awareness(update).encode(encoder);
65        Ok(())
66    }
67
68    /// Y-sync protocol message handler.
69    fn handle(&self, awareness: &Awareness, data: &[u8]) -> Result<SmallVec<[Message; 1]>, Error> {
70        let mut decoder = DecoderV1::new(Cursor::new(data));
71        let mut reader = MessageReader::new(&mut decoder);
72        let mut responses = SmallVec::new();
73        while let Some(result) = reader.next() {
74            let message = result?;
75            if let Some(response) = self.handle_message(awareness, message)? {
76                responses.push(response);
77            }
78        }
79        Ok(responses)
80    }
81
82    /// Handles incoming y-sync [Message] within the context of current awareness structure.
83    /// Returns an optional reply message that should be sent back to message sender.
84    fn handle_message(
85        &self,
86        awareness: &Awareness,
87        message: Message,
88    ) -> Result<Option<Message>, Error> {
89        match message {
90            Message::Sync(SyncMessage::SyncStep1(state_vector)) => {
91                self.handle_sync_step1(awareness, state_vector)
92            }
93            Message::Sync(SyncMessage::SyncStep2(update)) => {
94                let update = Update::decode_v1(&update)?;
95                self.handle_sync_step2(awareness, update)
96            }
97            Message::Sync(SyncMessage::Update(update)) => {
98                let update = Update::decode_v1(&update)?;
99                self.handle_update(awareness, update)
100            }
101            Message::Auth(deny_reason) => self.handle_auth(awareness, deny_reason),
102            Message::AwarenessQuery => self.handle_awareness_query(awareness),
103            Message::Awareness(update) => self.handle_awareness_update(awareness, update),
104            Message::Custom(tag, data) => self.missing_handle(awareness, tag, data),
105        }
106    }
107
108    /// Y-sync protocol sync-step-1 - given a [StateVector] of a remote side, calculate missing
109    /// updates. Returns a sync-step-2 message containing a calculated update.
110    fn handle_sync_step1(
111        &self,
112        awareness: &Awareness,
113        sv: StateVector,
114    ) -> Result<Option<Message>, Error> {
115        use crate::Transact;
116        let update = awareness.doc().transact().encode_state_as_update_v1(&sv);
117        Ok(Some(Message::Sync(SyncMessage::SyncStep2(update))))
118    }
119
120    /// Handle reply for a sync-step-1 send from this replica previously. By default just apply
121    /// an update to current `awareness` document instance.
122    fn handle_sync_step2(
123        &self,
124        awareness: &Awareness,
125        update: Update,
126    ) -> Result<Option<Message>, Error> {
127        use crate::Transact;
128        let mut txn = awareness.doc().transact_mut();
129        txn.apply_update(update)?;
130        Ok(None)
131    }
132
133    /// Handle continuous update send from the client. By default just apply an update to a current
134    /// `awareness` document instance.
135    fn handle_update(
136        &self,
137        awareness: &Awareness,
138        update: Update,
139    ) -> Result<Option<Message>, Error> {
140        self.handle_sync_step2(awareness, update)
141    }
142
143    /// Handle authorization message. By default if reason for auth denial has been provided,
144    /// send back [Error::PermissionDenied].
145    fn handle_auth(
146        &self,
147        _awareness: &Awareness,
148        deny_reason: Option<String>,
149    ) -> Result<Option<Message>, Error> {
150        if let Some(reason) = deny_reason {
151            Err(Error::PermissionDenied { reason })
152        } else {
153            Ok(None)
154        }
155    }
156
157    /// Returns an [AwarenessUpdate] which is a serializable representation of a current `awareness`
158    /// instance.
159    fn handle_awareness_query(&self, awareness: &Awareness) -> Result<Option<Message>, Error> {
160        let update = awareness.update()?;
161        Ok(Some(Message::Awareness(update)))
162    }
163
164    /// Reply to awareness query or just incoming [AwarenessUpdate], where current `awareness`
165    /// instance is being updated with incoming data.
166    fn handle_awareness_update(
167        &self,
168        awareness: &Awareness,
169        update: AwarenessUpdate,
170    ) -> Result<Option<Message>, Error> {
171        awareness.apply_update(update)?;
172        Ok(None)
173    }
174
175    /// Y-sync protocol enables to extend its own settings with custom handles. These can be
176    /// implemented here. By default, it returns an [Error::Unsupported].
177    fn missing_handle(
178        &self,
179        _awareness: &Awareness,
180        tag: u8,
181        _data: Vec<u8>,
182    ) -> Result<Option<Message>, Error> {
183        Err(Error::Unsupported(tag))
184    }
185}
186
187/// Trait implementing a y-sync protocol using awaitable transaction API. The default implementation
188/// can be found in [DefaultProtocol], but its implementation steps can be potentially changed by
189/// the user if necessary.
190#[cfg_attr(not(feature = "sync"), async_trait(?Send))]
191#[cfg_attr(feature = "sync", async_trait)]
192pub trait AsyncProtocol {
193    /// To be called whenever a new connection has been accepted. Returns a list of
194    /// messages to be sent back to initiator.
195    async fn start<E>(&self, awareness: &Awareness) -> Result<SmallVec<[Message; 1]>, Error>
196    where
197        E: Encoder,
198    {
199        use crate::AsyncTransact;
200        let (sv, update) = {
201            let update = awareness.update()?;
202            let txn = awareness.doc().transact().await;
203            let sv = txn.state_vector();
204            (sv, update)
205        };
206        Ok(smallvec![
207            Message::Sync(SyncMessage::SyncStep1(sv)),
208            Message::Awareness(update),
209        ])
210    }
211
212    /// Y-sync protocol message handler.
213    async fn handle(
214        &self,
215        awareness: &Awareness,
216        data: &[u8],
217    ) -> Result<SmallVec<[Message; 1]>, Error> {
218        let mut decoder = DecoderV1::new(Cursor::new(data));
219        let mut reader = MessageReader::new(&mut decoder);
220        let mut responses = SmallVec::new();
221        while let Some(result) = reader.next() {
222            let message = result?;
223            if let Some(response) = self.handle_message(awareness, message).await? {
224                responses.push(response);
225            }
226        }
227        Ok(responses)
228    }
229
230    /// Handles incoming y-sync [Message] within the context of current awareness structure.
231    /// Returns an optional reply message that should be sent back to message sender.
232    async fn handle_message(
233        &self,
234        awareness: &Awareness,
235        message: Message,
236    ) -> Result<Option<Message>, Error> {
237        match message {
238            Message::Sync(SyncMessage::SyncStep1(state_vector)) => {
239                self.handle_sync_step1(awareness, state_vector).await
240            }
241            Message::Sync(SyncMessage::SyncStep2(update)) => {
242                let update = Update::decode_v1(&update)?;
243                self.handle_sync_step2(awareness, update).await
244            }
245            Message::Sync(SyncMessage::Update(update)) => {
246                let update = Update::decode_v1(&update)?;
247                self.handle_update(awareness, update).await
248            }
249            Message::Auth(deny_reason) => self.handle_auth(awareness, deny_reason).await,
250            Message::AwarenessQuery => self.handle_awareness_query(awareness).await,
251            Message::Awareness(update) => self.handle_awareness_update(awareness, update).await,
252            Message::Custom(tag, data) => self.missing_handle(awareness, tag, data).await,
253        }
254    }
255
256    /// Y-sync protocol sync-step-1 - given a [StateVector] of a remote side, calculate missing
257    /// updates. Returns a sync-step-2 message containing a calculated update.
258    async fn handle_sync_step1(
259        &self,
260        awareness: &Awareness,
261        sv: StateVector,
262    ) -> Result<Option<Message>, Error> {
263        use crate::AsyncTransact;
264        let txn = awareness.doc().transact().await;
265        let update = txn.encode_state_as_update_v1(&sv);
266        Ok(Some(Message::Sync(SyncMessage::SyncStep2(update))))
267    }
268
269    /// Handle reply for a sync-step-1 send from this replica previously. By default just apply
270    /// an update to current `awareness` document instance.
271    async fn handle_sync_step2(
272        &self,
273        awareness: &Awareness,
274        update: Update,
275    ) -> Result<Option<Message>, Error> {
276        use crate::AsyncTransact;
277        let mut txn = awareness.doc().transact_mut().await;
278        txn.apply_update(update)?;
279        Ok(None)
280    }
281
282    /// Handle continuous update send from the client. By default just apply an update to a current
283    /// `awareness` document instance.
284    async fn handle_update(
285        &self,
286        awareness: &Awareness,
287        update: Update,
288    ) -> Result<Option<Message>, Error> {
289        self.handle_sync_step2(awareness, update).await
290    }
291
292    /// Handle authorization message. By default if reason for auth denial has been provided,
293    /// send back [Error::PermissionDenied].
294    async fn handle_auth(
295        &self,
296        _awareness: &Awareness,
297        deny_reason: Option<String>,
298    ) -> Result<Option<Message>, Error> {
299        if let Some(reason) = deny_reason {
300            Err(Error::PermissionDenied { reason })
301        } else {
302            Ok(None)
303        }
304    }
305
306    /// Returns an [AwarenessUpdate] which is a serializable representation of a current `awareness`
307    /// instance.
308    async fn handle_awareness_query(
309        &self,
310        awareness: &Awareness,
311    ) -> Result<Option<Message>, Error> {
312        let update = awareness.update()?;
313        Ok(Some(Message::Awareness(update)))
314    }
315
316    /// Reply to awareness query or just incoming [AwarenessUpdate], where current `awareness`
317    /// instance is being updated with incoming data.
318    async fn handle_awareness_update(
319        &self,
320        awareness: &Awareness,
321        update: AwarenessUpdate,
322    ) -> Result<Option<Message>, Error> {
323        awareness.apply_update(update)?;
324        Ok(None)
325    }
326
327    /// Y-sync protocol enables to extend its own settings with custom handles. These can be
328    /// implemented here. By default it returns an [Error::Unsupported].
329    async fn missing_handle(
330        &self,
331        _awareness: &Awareness,
332        tag: u8,
333        _data: Vec<u8>,
334    ) -> Result<Option<Message>, Error> {
335        Err(Error::Unsupported(tag))
336    }
337}
338
339/// Tag id for [Message::Sync].
340pub const MSG_SYNC: u8 = 0;
341/// Tag id for [Message::Awareness].
342pub const MSG_AWARENESS: u8 = 1;
343/// Tag id for [Message::Auth].
344pub const MSG_AUTH: u8 = 2;
345/// Tag id for [Message::AwarenessQuery].
346pub const MSG_QUERY_AWARENESS: u8 = 3;
347
348pub const PERMISSION_DENIED: u8 = 0;
349pub const PERMISSION_GRANTED: u8 = 1;
350
351#[derive(Debug, Clone, Eq, PartialEq)]
352pub enum Message {
353    Sync(SyncMessage),
354    Auth(Option<String>),
355    AwarenessQuery,
356    Awareness(AwarenessUpdate),
357    Custom(u8, Vec<u8>),
358}
359
360impl Encode for Message {
361    fn encode<E: Encoder>(&self, encoder: &mut E) {
362        match self {
363            Message::Sync(msg) => {
364                encoder.write_var(MSG_SYNC);
365                msg.encode(encoder);
366            }
367            Message::Auth(reason) => {
368                encoder.write_var(MSG_AUTH);
369                if let Some(reason) = reason {
370                    encoder.write_var(PERMISSION_DENIED);
371                    encoder.write_string(&reason);
372                } else {
373                    encoder.write_var(PERMISSION_GRANTED);
374                }
375            }
376            Message::AwarenessQuery => {
377                encoder.write_var(MSG_QUERY_AWARENESS);
378            }
379            Message::Awareness(update) => {
380                encoder.write_var(MSG_AWARENESS);
381                encoder.write_buf(&update.encode_v1())
382            }
383            Message::Custom(tag, data) => {
384                encoder.write_u8(*tag);
385                encoder.write_buf(&data);
386            }
387        }
388    }
389}
390
391impl Decode for Message {
392    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, read::Error> {
393        let tag: u8 = decoder.read_var()?;
394        match tag {
395            MSG_SYNC => {
396                let msg = SyncMessage::decode(decoder)?;
397                Ok(Message::Sync(msg))
398            }
399            MSG_AWARENESS => {
400                let data = decoder.read_buf()?;
401                let update = AwarenessUpdate::decode_v1(data)?;
402                Ok(Message::Awareness(update))
403            }
404            MSG_AUTH => {
405                let reason = if decoder.read_var::<u8>()? == PERMISSION_DENIED {
406                    Some(decoder.read_string()?.to_string())
407                } else {
408                    None
409                };
410                Ok(Message::Auth(reason))
411            }
412            MSG_QUERY_AWARENESS => Ok(Message::AwarenessQuery),
413            tag => {
414                let data = decoder.read_buf()?;
415                Ok(Message::Custom(tag, data.to_vec()))
416            }
417        }
418    }
419}
420
421/// Tag id for [SyncMessage::SyncStep1].
422pub const MSG_SYNC_STEP_1: u8 = 0;
423/// Tag id for [SyncMessage::SyncStep2].
424pub const MSG_SYNC_STEP_2: u8 = 1;
425/// Tag id for [SyncMessage::Update].
426pub const MSG_SYNC_UPDATE: u8 = 2;
427
428#[derive(Debug, Clone, PartialEq, Eq)]
429pub enum SyncMessage {
430    SyncStep1(StateVector),
431    SyncStep2(Vec<u8>),
432    Update(Vec<u8>),
433}
434
435impl Encode for SyncMessage {
436    fn encode<E: Encoder>(&self, encoder: &mut E) {
437        match self {
438            SyncMessage::SyncStep1(sv) => {
439                encoder.write_var(MSG_SYNC_STEP_1);
440                encoder.write_buf(sv.encode_v1());
441            }
442            SyncMessage::SyncStep2(u) => {
443                encoder.write_var(MSG_SYNC_STEP_2);
444                encoder.write_buf(u);
445            }
446            SyncMessage::Update(u) => {
447                encoder.write_var(MSG_SYNC_UPDATE);
448                encoder.write_buf(u);
449            }
450        }
451    }
452}
453
454impl Decode for SyncMessage {
455    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, read::Error> {
456        let tag: u8 = decoder.read_var()?;
457        match tag {
458            MSG_SYNC_STEP_1 => {
459                let buf = decoder.read_buf()?;
460                let sv = StateVector::decode_v1(buf)?;
461                Ok(SyncMessage::SyncStep1(sv))
462            }
463            MSG_SYNC_STEP_2 => {
464                let buf = decoder.read_buf()?;
465                Ok(SyncMessage::SyncStep2(buf.into()))
466            }
467            MSG_SYNC_UPDATE => {
468                let buf = decoder.read_buf()?;
469                Ok(SyncMessage::Update(buf.into()))
470            }
471            _ => Err(read::Error::UnexpectedValue),
472        }
473    }
474}
475
476/// An error type returned in response from y-sync [Protocol].
477#[derive(Debug, Error)]
478pub enum Error {
479    /// Incoming Y-protocol message couldn't be deserialized.
480    #[error("failed to deserialize message: {0}")]
481    DecodingError(#[from] read::Error),
482
483    /// Applying incoming Y-protocol awareness update has failed.
484    #[error("failed to process awareness update: {0}")]
485    AwarenessEncoding(#[from] awareness::Error),
486
487    /// An incoming Y-protocol authorization request has been denied.
488    #[error("permission denied to access: {reason}")]
489    PermissionDenied { reason: String },
490
491    /// Thrown whenever an unknown message tag has been sent.
492    #[error("unsupported message tag identifier: {0}")]
493    Unsupported(u8),
494
495    /// Thrown in case of I/O errors.
496    #[error("IO error: {0}")]
497    IO(#[from] std::io::Error),
498
499    #[error("failed to apply update: {0}")]
500    Update(#[from] UpdateError),
501
502    /// Custom dynamic kind of error, usually related to a warp internal error messages.
503    #[error("internal failure: {0}")]
504    Other(#[from] Box<dyn std::error::Error + Send + Sync>),
505}
506
507/// Since y-sync protocol enables for a multiple messages to be packed into a singe byte payload,
508/// [MessageReader] can be used over the decoder to read these messages one by one in iterable
509/// fashion.
510pub struct MessageReader<'a, D: Decoder>(&'a mut D);
511
512impl<'a, D: Decoder> MessageReader<'a, D> {
513    pub fn new(decoder: &'a mut D) -> Self {
514        MessageReader(decoder)
515    }
516}
517
518impl<'a, D: Decoder> Iterator for MessageReader<'a, D> {
519    type Item = Result<Message, read::Error>;
520
521    fn next(&mut self) -> Option<Self::Item> {
522        match Message::decode(self.0) {
523            Ok(msg) => Some(Ok(msg)),
524            Err(read::Error::EndOfBuffer(_)) => None,
525            Err(error) => Some(Err(error)),
526        }
527    }
528}
529
530#[cfg(test)]
531mod test {
532    use crate::encoding::read::Cursor;
533    use crate::sync::protocol::MessageReader;
534    use crate::sync::{Awareness, Protocol};
535    use crate::updates::decoder::{Decode, DecoderV1};
536    use crate::updates::encoder::{Encode, Encoder, EncoderV1};
537    use crate::{Doc, GetString, ReadTxn, StateVector, Text, Transact, Update};
538    use serde_json::json;
539    use std::collections::HashMap;
540
541    #[test]
542    fn message_encoding() {
543        let doc = Doc::new();
544        let txt = doc.get_or_insert_text("text");
545        txt.push(&mut doc.transact_mut(), "hello world");
546        let awareness = Awareness::new(doc);
547        awareness
548            .set_local_state(json!({
549              "user":{
550                "name": "Anonymous 50",
551                "color": "#30bced",
552                "colorLight": "#30bced33"
553              }
554            }))
555            .unwrap();
556
557        let messages = [
558            crate::sync::Message::Sync(crate::sync::SyncMessage::SyncStep1(
559                awareness.doc().transact().state_vector(),
560            )),
561            crate::sync::Message::Sync(crate::sync::SyncMessage::SyncStep2(
562                awareness
563                    .doc()
564                    .transact()
565                    .encode_state_as_update_v1(&StateVector::default()),
566            )),
567            crate::sync::Message::Awareness(awareness.update().unwrap()),
568            crate::sync::Message::Auth(Some(
569                "reason
570            }"
571                .to_string(),
572            )),
573            crate::sync::Message::AwarenessQuery,
574        ];
575
576        for msg in messages {
577            let encoded = msg.encode_v1();
578            let decoded = crate::sync::Message::decode_v1(&encoded)
579                .expect(&format!("failed to decode {:?}", msg));
580            assert_eq!(decoded, msg);
581        }
582    }
583
584    #[test]
585    fn protocol_init() {
586        let awareness = Awareness::default();
587        let protocol = crate::sync::DefaultProtocol;
588        let mut encoder = EncoderV1::new();
589        protocol.start(&awareness, &mut encoder).unwrap();
590        let data = encoder.to_vec();
591        let mut decoder = DecoderV1::new(Cursor::new(&data));
592        let mut reader = MessageReader::new(&mut decoder);
593
594        assert_eq!(
595            reader.next().unwrap().unwrap(),
596            crate::sync::Message::Sync(crate::sync::SyncMessage::SyncStep1(StateVector::default()))
597        );
598
599        assert_eq!(
600            reader.next().unwrap().unwrap(),
601            crate::sync::Message::Awareness(awareness.update().unwrap())
602        );
603
604        assert!(reader.next().is_none());
605    }
606
607    #[test]
608    fn protocol_sync_steps() {
609        let protocol = crate::sync::DefaultProtocol;
610
611        let mut a1 = Awareness::new(Doc::with_client_id(1));
612        let mut a2 = Awareness::new(Doc::with_client_id(2));
613
614        let expected = {
615            let txt = a1.doc_mut().get_or_insert_text("test");
616            let mut txn = a1.doc_mut().transact_mut();
617            txt.push(&mut txn, "hello");
618            txn.encode_state_as_update_v1(&StateVector::default())
619        };
620
621        let result = protocol
622            .handle_sync_step1(&a1, a2.doc().transact().state_vector())
623            .unwrap();
624
625        assert_eq!(
626            result,
627            Some(crate::sync::Message::Sync(
628                crate::sync::SyncMessage::SyncStep2(expected)
629            ))
630        );
631
632        if let Some(crate::sync::Message::Sync(crate::sync::SyncMessage::SyncStep2(u))) = result {
633            let result2 = protocol
634                .handle_sync_step2(&mut a2, Update::decode_v1(&u).unwrap())
635                .unwrap();
636
637            assert!(result2.is_none());
638        }
639
640        let txt = a2.doc().transact().get_text("test").unwrap();
641        assert_eq!(txt.get_string(&a2.doc().transact()), "hello".to_owned());
642    }
643
644    #[test]
645    fn protocol_sync_step_update() {
646        let protocol = crate::sync::DefaultProtocol;
647
648        let mut a1 = Awareness::new(Doc::with_client_id(1));
649        let mut a2 = Awareness::new(Doc::with_client_id(2));
650
651        let data = {
652            let txt = a1.doc_mut().get_or_insert_text("test");
653            let mut txn = a1.doc_mut().transact_mut();
654            txt.push(&mut txn, "hello");
655            txn.encode_update_v1()
656        };
657
658        let result = protocol
659            .handle_update(&mut a2, Update::decode_v1(&data).unwrap())
660            .unwrap();
661
662        assert!(result.is_none());
663
664        let txt = a2.doc().transact().get_text("test").unwrap();
665        assert_eq!(txt.get_string(&a2.doc().transact()), "hello".to_owned());
666    }
667
668    #[test]
669    fn protocol_awareness_sync() {
670        let protocol = crate::sync::DefaultProtocol;
671
672        let a1 = Awareness::new(Doc::with_client_id(1));
673        let a2 = Awareness::new(Doc::with_client_id(2));
674
675        a1.set_local_state(json!({"x":3})).unwrap();
676        let result = protocol.handle_awareness_query(&a1).unwrap();
677
678        assert_eq!(
679            result,
680            Some(crate::sync::Message::Awareness(a1.update().unwrap()))
681        );
682
683        if let Some(crate::sync::Message::Awareness(u)) = result {
684            let result = protocol.handle_awareness_update(&a2, u).unwrap();
685            assert!(result.is_none());
686        }
687
688        let a2_clients: HashMap<_, _> = a2
689            .iter()
690            .flat_map(|(id, state)| state.data.map(|data| (id, data)))
691            .collect();
692        assert_eq!(a2_clients, HashMap::from([(1, "{\"x\":3}".into())]));
693    }
694}