1use crate::encoding::read;
2use crate::encoding::read::Cursor;
3use crate::error::UpdateError;
4use crate::sync::{awareness, Awareness, AwarenessUpdate};
5use crate::updates::decoder::{Decode, Decoder, DecoderV1};
6use crate::updates::encoder::{Encode, Encoder};
7use crate::{ReadTxn, StateVector, Update};
8use async_trait::async_trait;
9use smallvec::{smallvec, SmallVec};
10use thiserror::Error;
11#[derive(Debug, Copy, Clone, Default)]
38pub struct DefaultProtocol;
39
40impl Protocol for DefaultProtocol {}
41
42#[cfg_attr(not(feature = "sync"), async_trait(?Send))]
43#[cfg_attr(feature = "sync", async_trait)]
44impl AsyncProtocol for DefaultProtocol {}
45
46pub trait Protocol {
50 fn start<E>(&self, awareness: &Awareness, encoder: &mut E) -> Result<(), Error>
54 where
55 E: Encoder,
56 {
57 use crate::Transact;
58 let (sv, update) = {
59 let sv = awareness.doc().transact().state_vector();
60 let update = awareness.update()?;
61 (sv, update)
62 };
63 Message::Sync(SyncMessage::SyncStep1(sv)).encode(encoder);
64 Message::Awareness(update).encode(encoder);
65 Ok(())
66 }
67
68 fn handle(&self, awareness: &Awareness, data: &[u8]) -> Result<SmallVec<[Message; 1]>, Error> {
70 let mut decoder = DecoderV1::new(Cursor::new(data));
71 let mut reader = MessageReader::new(&mut decoder);
72 let mut responses = SmallVec::new();
73 while let Some(result) = reader.next() {
74 let message = result?;
75 if let Some(response) = self.handle_message(awareness, message)? {
76 responses.push(response);
77 }
78 }
79 Ok(responses)
80 }
81
82 fn handle_message(
85 &self,
86 awareness: &Awareness,
87 message: Message,
88 ) -> Result<Option<Message>, Error> {
89 match message {
90 Message::Sync(SyncMessage::SyncStep1(state_vector)) => {
91 self.handle_sync_step1(awareness, state_vector)
92 }
93 Message::Sync(SyncMessage::SyncStep2(update)) => {
94 let update = Update::decode_v1(&update)?;
95 self.handle_sync_step2(awareness, update)
96 }
97 Message::Sync(SyncMessage::Update(update)) => {
98 let update = Update::decode_v1(&update)?;
99 self.handle_update(awareness, update)
100 }
101 Message::Auth(deny_reason) => self.handle_auth(awareness, deny_reason),
102 Message::AwarenessQuery => self.handle_awareness_query(awareness),
103 Message::Awareness(update) => self.handle_awareness_update(awareness, update),
104 Message::Custom(tag, data) => self.missing_handle(awareness, tag, data),
105 }
106 }
107
108 fn handle_sync_step1(
111 &self,
112 awareness: &Awareness,
113 sv: StateVector,
114 ) -> Result<Option<Message>, Error> {
115 use crate::Transact;
116 let update = awareness.doc().transact().encode_state_as_update_v1(&sv);
117 Ok(Some(Message::Sync(SyncMessage::SyncStep2(update))))
118 }
119
120 fn handle_sync_step2(
123 &self,
124 awareness: &Awareness,
125 update: Update,
126 ) -> Result<Option<Message>, Error> {
127 use crate::Transact;
128 let mut txn = awareness.doc().transact_mut();
129 txn.apply_update(update)?;
130 Ok(None)
131 }
132
133 fn handle_update(
136 &self,
137 awareness: &Awareness,
138 update: Update,
139 ) -> Result<Option<Message>, Error> {
140 self.handle_sync_step2(awareness, update)
141 }
142
143 fn handle_auth(
146 &self,
147 _awareness: &Awareness,
148 deny_reason: Option<String>,
149 ) -> Result<Option<Message>, Error> {
150 if let Some(reason) = deny_reason {
151 Err(Error::PermissionDenied { reason })
152 } else {
153 Ok(None)
154 }
155 }
156
157 fn handle_awareness_query(&self, awareness: &Awareness) -> Result<Option<Message>, Error> {
160 let update = awareness.update()?;
161 Ok(Some(Message::Awareness(update)))
162 }
163
164 fn handle_awareness_update(
167 &self,
168 awareness: &Awareness,
169 update: AwarenessUpdate,
170 ) -> Result<Option<Message>, Error> {
171 awareness.apply_update(update)?;
172 Ok(None)
173 }
174
175 fn missing_handle(
178 &self,
179 _awareness: &Awareness,
180 tag: u8,
181 _data: Vec<u8>,
182 ) -> Result<Option<Message>, Error> {
183 Err(Error::Unsupported(tag))
184 }
185}
186
187#[cfg_attr(not(feature = "sync"), async_trait(?Send))]
191#[cfg_attr(feature = "sync", async_trait)]
192pub trait AsyncProtocol {
193 async fn start<E>(&self, awareness: &Awareness) -> Result<SmallVec<[Message; 1]>, Error>
196 where
197 E: Encoder,
198 {
199 use crate::AsyncTransact;
200 let (sv, update) = {
201 let update = awareness.update()?;
202 let txn = awareness.doc().transact().await;
203 let sv = txn.state_vector();
204 (sv, update)
205 };
206 Ok(smallvec![
207 Message::Sync(SyncMessage::SyncStep1(sv)),
208 Message::Awareness(update),
209 ])
210 }
211
212 async fn handle(
214 &self,
215 awareness: &Awareness,
216 data: &[u8],
217 ) -> Result<SmallVec<[Message; 1]>, Error> {
218 let mut decoder = DecoderV1::new(Cursor::new(data));
219 let mut reader = MessageReader::new(&mut decoder);
220 let mut responses = SmallVec::new();
221 while let Some(result) = reader.next() {
222 let message = result?;
223 if let Some(response) = self.handle_message(awareness, message).await? {
224 responses.push(response);
225 }
226 }
227 Ok(responses)
228 }
229
230 async fn handle_message(
233 &self,
234 awareness: &Awareness,
235 message: Message,
236 ) -> Result<Option<Message>, Error> {
237 match message {
238 Message::Sync(SyncMessage::SyncStep1(state_vector)) => {
239 self.handle_sync_step1(awareness, state_vector).await
240 }
241 Message::Sync(SyncMessage::SyncStep2(update)) => {
242 let update = Update::decode_v1(&update)?;
243 self.handle_sync_step2(awareness, update).await
244 }
245 Message::Sync(SyncMessage::Update(update)) => {
246 let update = Update::decode_v1(&update)?;
247 self.handle_update(awareness, update).await
248 }
249 Message::Auth(deny_reason) => self.handle_auth(awareness, deny_reason).await,
250 Message::AwarenessQuery => self.handle_awareness_query(awareness).await,
251 Message::Awareness(update) => self.handle_awareness_update(awareness, update).await,
252 Message::Custom(tag, data) => self.missing_handle(awareness, tag, data).await,
253 }
254 }
255
256 async fn handle_sync_step1(
259 &self,
260 awareness: &Awareness,
261 sv: StateVector,
262 ) -> Result<Option<Message>, Error> {
263 use crate::AsyncTransact;
264 let txn = awareness.doc().transact().await;
265 let update = txn.encode_state_as_update_v1(&sv);
266 Ok(Some(Message::Sync(SyncMessage::SyncStep2(update))))
267 }
268
269 async fn handle_sync_step2(
272 &self,
273 awareness: &Awareness,
274 update: Update,
275 ) -> Result<Option<Message>, Error> {
276 use crate::AsyncTransact;
277 let mut txn = awareness.doc().transact_mut().await;
278 txn.apply_update(update)?;
279 Ok(None)
280 }
281
282 async fn handle_update(
285 &self,
286 awareness: &Awareness,
287 update: Update,
288 ) -> Result<Option<Message>, Error> {
289 self.handle_sync_step2(awareness, update).await
290 }
291
292 async fn handle_auth(
295 &self,
296 _awareness: &Awareness,
297 deny_reason: Option<String>,
298 ) -> Result<Option<Message>, Error> {
299 if let Some(reason) = deny_reason {
300 Err(Error::PermissionDenied { reason })
301 } else {
302 Ok(None)
303 }
304 }
305
306 async fn handle_awareness_query(
309 &self,
310 awareness: &Awareness,
311 ) -> Result<Option<Message>, Error> {
312 let update = awareness.update()?;
313 Ok(Some(Message::Awareness(update)))
314 }
315
316 async fn handle_awareness_update(
319 &self,
320 awareness: &Awareness,
321 update: AwarenessUpdate,
322 ) -> Result<Option<Message>, Error> {
323 awareness.apply_update(update)?;
324 Ok(None)
325 }
326
327 async fn missing_handle(
330 &self,
331 _awareness: &Awareness,
332 tag: u8,
333 _data: Vec<u8>,
334 ) -> Result<Option<Message>, Error> {
335 Err(Error::Unsupported(tag))
336 }
337}
338
339pub const MSG_SYNC: u8 = 0;
341pub const MSG_AWARENESS: u8 = 1;
343pub const MSG_AUTH: u8 = 2;
345pub const MSG_QUERY_AWARENESS: u8 = 3;
347
348pub const PERMISSION_DENIED: u8 = 0;
349pub const PERMISSION_GRANTED: u8 = 1;
350
351#[derive(Debug, Clone, Eq, PartialEq)]
352pub enum Message {
353 Sync(SyncMessage),
354 Auth(Option<String>),
355 AwarenessQuery,
356 Awareness(AwarenessUpdate),
357 Custom(u8, Vec<u8>),
358}
359
360impl Encode for Message {
361 fn encode<E: Encoder>(&self, encoder: &mut E) {
362 match self {
363 Message::Sync(msg) => {
364 encoder.write_var(MSG_SYNC);
365 msg.encode(encoder);
366 }
367 Message::Auth(reason) => {
368 encoder.write_var(MSG_AUTH);
369 if let Some(reason) = reason {
370 encoder.write_var(PERMISSION_DENIED);
371 encoder.write_string(&reason);
372 } else {
373 encoder.write_var(PERMISSION_GRANTED);
374 }
375 }
376 Message::AwarenessQuery => {
377 encoder.write_var(MSG_QUERY_AWARENESS);
378 }
379 Message::Awareness(update) => {
380 encoder.write_var(MSG_AWARENESS);
381 encoder.write_buf(&update.encode_v1())
382 }
383 Message::Custom(tag, data) => {
384 encoder.write_u8(*tag);
385 encoder.write_buf(&data);
386 }
387 }
388 }
389}
390
391impl Decode for Message {
392 fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, read::Error> {
393 let tag: u8 = decoder.read_var()?;
394 match tag {
395 MSG_SYNC => {
396 let msg = SyncMessage::decode(decoder)?;
397 Ok(Message::Sync(msg))
398 }
399 MSG_AWARENESS => {
400 let data = decoder.read_buf()?;
401 let update = AwarenessUpdate::decode_v1(data)?;
402 Ok(Message::Awareness(update))
403 }
404 MSG_AUTH => {
405 let reason = if decoder.read_var::<u8>()? == PERMISSION_DENIED {
406 Some(decoder.read_string()?.to_string())
407 } else {
408 None
409 };
410 Ok(Message::Auth(reason))
411 }
412 MSG_QUERY_AWARENESS => Ok(Message::AwarenessQuery),
413 tag => {
414 let data = decoder.read_buf()?;
415 Ok(Message::Custom(tag, data.to_vec()))
416 }
417 }
418 }
419}
420
421pub const MSG_SYNC_STEP_1: u8 = 0;
423pub const MSG_SYNC_STEP_2: u8 = 1;
425pub const MSG_SYNC_UPDATE: u8 = 2;
427
428#[derive(Debug, Clone, PartialEq, Eq)]
429pub enum SyncMessage {
430 SyncStep1(StateVector),
431 SyncStep2(Vec<u8>),
432 Update(Vec<u8>),
433}
434
435impl Encode for SyncMessage {
436 fn encode<E: Encoder>(&self, encoder: &mut E) {
437 match self {
438 SyncMessage::SyncStep1(sv) => {
439 encoder.write_var(MSG_SYNC_STEP_1);
440 encoder.write_buf(sv.encode_v1());
441 }
442 SyncMessage::SyncStep2(u) => {
443 encoder.write_var(MSG_SYNC_STEP_2);
444 encoder.write_buf(u);
445 }
446 SyncMessage::Update(u) => {
447 encoder.write_var(MSG_SYNC_UPDATE);
448 encoder.write_buf(u);
449 }
450 }
451 }
452}
453
454impl Decode for SyncMessage {
455 fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, read::Error> {
456 let tag: u8 = decoder.read_var()?;
457 match tag {
458 MSG_SYNC_STEP_1 => {
459 let buf = decoder.read_buf()?;
460 let sv = StateVector::decode_v1(buf)?;
461 Ok(SyncMessage::SyncStep1(sv))
462 }
463 MSG_SYNC_STEP_2 => {
464 let buf = decoder.read_buf()?;
465 Ok(SyncMessage::SyncStep2(buf.into()))
466 }
467 MSG_SYNC_UPDATE => {
468 let buf = decoder.read_buf()?;
469 Ok(SyncMessage::Update(buf.into()))
470 }
471 _ => Err(read::Error::UnexpectedValue),
472 }
473 }
474}
475
476#[derive(Debug, Error)]
478pub enum Error {
479 #[error("failed to deserialize message: {0}")]
481 DecodingError(#[from] read::Error),
482
483 #[error("failed to process awareness update: {0}")]
485 AwarenessEncoding(#[from] awareness::Error),
486
487 #[error("permission denied to access: {reason}")]
489 PermissionDenied { reason: String },
490
491 #[error("unsupported message tag identifier: {0}")]
493 Unsupported(u8),
494
495 #[error("IO error: {0}")]
497 IO(#[from] std::io::Error),
498
499 #[error("failed to apply update: {0}")]
500 Update(#[from] UpdateError),
501
502 #[error("internal failure: {0}")]
504 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
505}
506
507pub struct MessageReader<'a, D: Decoder>(&'a mut D);
511
512impl<'a, D: Decoder> MessageReader<'a, D> {
513 pub fn new(decoder: &'a mut D) -> Self {
514 MessageReader(decoder)
515 }
516}
517
518impl<'a, D: Decoder> Iterator for MessageReader<'a, D> {
519 type Item = Result<Message, read::Error>;
520
521 fn next(&mut self) -> Option<Self::Item> {
522 match Message::decode(self.0) {
523 Ok(msg) => Some(Ok(msg)),
524 Err(read::Error::EndOfBuffer(_)) => None,
525 Err(error) => Some(Err(error)),
526 }
527 }
528}
529
530#[cfg(test)]
531mod test {
532 use crate::encoding::read::Cursor;
533 use crate::sync::protocol::MessageReader;
534 use crate::sync::{Awareness, Protocol};
535 use crate::updates::decoder::{Decode, DecoderV1};
536 use crate::updates::encoder::{Encode, Encoder, EncoderV1};
537 use crate::{Doc, GetString, ReadTxn, StateVector, Text, Transact, Update};
538 use serde_json::json;
539 use std::collections::HashMap;
540
541 #[test]
542 fn message_encoding() {
543 let doc = Doc::new();
544 let txt = doc.get_or_insert_text("text");
545 txt.push(&mut doc.transact_mut(), "hello world");
546 let awareness = Awareness::new(doc);
547 awareness
548 .set_local_state(json!({
549 "user":{
550 "name": "Anonymous 50",
551 "color": "#30bced",
552 "colorLight": "#30bced33"
553 }
554 }))
555 .unwrap();
556
557 let messages = [
558 crate::sync::Message::Sync(crate::sync::SyncMessage::SyncStep1(
559 awareness.doc().transact().state_vector(),
560 )),
561 crate::sync::Message::Sync(crate::sync::SyncMessage::SyncStep2(
562 awareness
563 .doc()
564 .transact()
565 .encode_state_as_update_v1(&StateVector::default()),
566 )),
567 crate::sync::Message::Awareness(awareness.update().unwrap()),
568 crate::sync::Message::Auth(Some(
569 "reason
570 }"
571 .to_string(),
572 )),
573 crate::sync::Message::AwarenessQuery,
574 ];
575
576 for msg in messages {
577 let encoded = msg.encode_v1();
578 let decoded = crate::sync::Message::decode_v1(&encoded)
579 .expect(&format!("failed to decode {:?}", msg));
580 assert_eq!(decoded, msg);
581 }
582 }
583
584 #[test]
585 fn protocol_init() {
586 let awareness = Awareness::default();
587 let protocol = crate::sync::DefaultProtocol;
588 let mut encoder = EncoderV1::new();
589 protocol.start(&awareness, &mut encoder).unwrap();
590 let data = encoder.to_vec();
591 let mut decoder = DecoderV1::new(Cursor::new(&data));
592 let mut reader = MessageReader::new(&mut decoder);
593
594 assert_eq!(
595 reader.next().unwrap().unwrap(),
596 crate::sync::Message::Sync(crate::sync::SyncMessage::SyncStep1(StateVector::default()))
597 );
598
599 assert_eq!(
600 reader.next().unwrap().unwrap(),
601 crate::sync::Message::Awareness(awareness.update().unwrap())
602 );
603
604 assert!(reader.next().is_none());
605 }
606
607 #[test]
608 fn protocol_sync_steps() {
609 let protocol = crate::sync::DefaultProtocol;
610
611 let mut a1 = Awareness::new(Doc::with_client_id(1));
612 let mut a2 = Awareness::new(Doc::with_client_id(2));
613
614 let expected = {
615 let txt = a1.doc_mut().get_or_insert_text("test");
616 let mut txn = a1.doc_mut().transact_mut();
617 txt.push(&mut txn, "hello");
618 txn.encode_state_as_update_v1(&StateVector::default())
619 };
620
621 let result = protocol
622 .handle_sync_step1(&a1, a2.doc().transact().state_vector())
623 .unwrap();
624
625 assert_eq!(
626 result,
627 Some(crate::sync::Message::Sync(
628 crate::sync::SyncMessage::SyncStep2(expected)
629 ))
630 );
631
632 if let Some(crate::sync::Message::Sync(crate::sync::SyncMessage::SyncStep2(u))) = result {
633 let result2 = protocol
634 .handle_sync_step2(&mut a2, Update::decode_v1(&u).unwrap())
635 .unwrap();
636
637 assert!(result2.is_none());
638 }
639
640 let txt = a2.doc().transact().get_text("test").unwrap();
641 assert_eq!(txt.get_string(&a2.doc().transact()), "hello".to_owned());
642 }
643
644 #[test]
645 fn protocol_sync_step_update() {
646 let protocol = crate::sync::DefaultProtocol;
647
648 let mut a1 = Awareness::new(Doc::with_client_id(1));
649 let mut a2 = Awareness::new(Doc::with_client_id(2));
650
651 let data = {
652 let txt = a1.doc_mut().get_or_insert_text("test");
653 let mut txn = a1.doc_mut().transact_mut();
654 txt.push(&mut txn, "hello");
655 txn.encode_update_v1()
656 };
657
658 let result = protocol
659 .handle_update(&mut a2, Update::decode_v1(&data).unwrap())
660 .unwrap();
661
662 assert!(result.is_none());
663
664 let txt = a2.doc().transact().get_text("test").unwrap();
665 assert_eq!(txt.get_string(&a2.doc().transact()), "hello".to_owned());
666 }
667
668 #[test]
669 fn protocol_awareness_sync() {
670 let protocol = crate::sync::DefaultProtocol;
671
672 let a1 = Awareness::new(Doc::with_client_id(1));
673 let a2 = Awareness::new(Doc::with_client_id(2));
674
675 a1.set_local_state(json!({"x":3})).unwrap();
676 let result = protocol.handle_awareness_query(&a1).unwrap();
677
678 assert_eq!(
679 result,
680 Some(crate::sync::Message::Awareness(a1.update().unwrap()))
681 );
682
683 if let Some(crate::sync::Message::Awareness(u)) = result {
684 let result = protocol.handle_awareness_update(&a2, u).unwrap();
685 assert!(result.is_none());
686 }
687
688 let a2_clients: HashMap<_, _> = a2
689 .iter()
690 .flat_map(|(id, state)| state.data.map(|data| (id, data)))
691 .collect();
692 assert_eq!(a2_clients, HashMap::from([(1, "{\"x\":3}".into())]));
693 }
694}