automerge/sync/
state.rs

1use std::collections::BTreeSet;
2
3#[cfg(doc)]
4use super::SyncDoc;
5use super::{encode_hashes, BloomFilter, Capability};
6use crate::storage::parse;
7use crate::ChangeHash;
8
9const SYNC_STATE_TYPE: u8 = 0x43; // first byte of an encoded sync state, for identification
10
11#[derive(Debug, thiserror::Error)]
12pub enum DecodeError {
13    #[error("{0:?}")]
14    Parse(String),
15    #[error("wrong type: expected one of {expected_one_of:?} but found {found}")]
16    WrongType { expected_one_of: Vec<u8>, found: u8 },
17    #[error("not enough input")]
18    NotEnoughInput,
19}
20
21impl From<parse::leb128::Error> for DecodeError {
22    fn from(_: parse::leb128::Error) -> Self {
23        Self::Parse("bad leb128 encoding".to_string())
24    }
25}
26
27/// The state of synchronisation with a peer.
28///
29/// This should be persisted using [`Self::encode()`] when you know you will be interacting with the
30/// same peer in multiple sessions. [`Self::encode()`] only encodes state which should be reused
31/// across connections.
32#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
33pub struct State {
34    /// The hashes which we know both peers have
35    pub shared_heads: Vec<ChangeHash>,
36    /// The heads we last sent
37    pub last_sent_heads: Vec<ChangeHash>,
38    /// The heads we last received from them
39    pub their_heads: Option<Vec<ChangeHash>>,
40    /// Any specific changes they last said they needed
41    pub their_need: Option<Vec<ChangeHash>>,
42    /// The bloom filters summarising what they said they have
43    pub their_have: Option<Vec<Have>>,
44    /// The hashes we have sent in this session
45    pub sent_hashes: BTreeSet<ChangeHash>,
46
47    /// [`SyncDoc::generate_sync_message()`] should return [`None`] if there are no new changes
48    /// to send. In particular, if there are changes in flight which the other end has not yet
49    /// acknowledged we do not wish to generate duplicate sync messages. This field tracks whether
50    /// the changes we expect to send to the peer based on this sync state have been sent or not. If
51    /// [`Self::in_flight`] is [`false`] then [`SyncDoc::generate_sync_message()`] will return a new
52    /// message (provided there are in fact changes to send). If it is [`true`] then we don't. This
53    /// flag is cleared in [`SyncDoc::receive_sync_message()`].
54    pub in_flight: bool,
55
56    /// Whether we have ever responded to the other end. This is used to ensure that we always send
57    /// at least on sync message to the other end, even if we have no changes to send, which is
58    /// necessary because we want the other end to know what our heads are.
59    pub have_responded: bool,
60
61    /// The capabilities the other side has said they have
62    pub their_capabilities: Option<Vec<Capability>>,
63}
64
65/// A summary of the changes that the sender of the message already has.
66/// This is implicitly a request to the recipient to send all changes that the
67/// sender does not already have.
68#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, serde::Serialize)]
69pub struct Have {
70    /// The heads at the time of the last successful sync with this recipient.
71    pub last_sync: Vec<ChangeHash>,
72    /// A bloom filter summarising all of the changes that the sender of the message has added
73    /// since the last sync.
74    pub bloom: BloomFilter,
75}
76
77impl State {
78    pub fn new() -> Self {
79        Default::default()
80    }
81
82    pub fn encode(&self) -> Vec<u8> {
83        let mut buf = vec![SYNC_STATE_TYPE];
84        encode_hashes(&mut buf, &self.shared_heads);
85        buf
86    }
87
88    pub fn decode(input: &[u8]) -> Result<Self, DecodeError> {
89        let input = parse::Input::new(input);
90        match Self::parse(input) {
91            Ok((_, state)) => Ok(state),
92            Err(parse::ParseError::Incomplete(_)) => Err(DecodeError::NotEnoughInput),
93            Err(parse::ParseError::Error(e)) => Err(e),
94        }
95    }
96
97    pub(crate) fn parse(input: parse::Input<'_>) -> parse::ParseResult<'_, Self, DecodeError> {
98        let (i, record_type) = parse::take1(input)?;
99        if record_type != SYNC_STATE_TYPE {
100            return Err(parse::ParseError::Error(DecodeError::WrongType {
101                expected_one_of: vec![SYNC_STATE_TYPE],
102                found: record_type,
103            }));
104        }
105
106        let (i, shared_heads) = parse::length_prefixed(parse::change_hash)(i)?;
107        Ok((
108            i,
109            Self {
110                shared_heads,
111                last_sent_heads: Vec::new(),
112                their_heads: None,
113                their_need: None,
114                their_have: Some(Vec::new()),
115                sent_hashes: BTreeSet::new(),
116                in_flight: false,
117                have_responded: false,
118                their_capabilities: None,
119            },
120        ))
121    }
122
123    pub(crate) fn supports_v2_messages(&self) -> bool {
124        self.their_capabilities
125            .as_ref()
126            .map(|caps| caps.contains(&Capability::MessageV2))
127            .unwrap_or(false)
128    }
129}