jazz_telepathy/
sets.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2
3use audi::{Listener, Listeners};
4use conundrum::{
5    hashing::HashOf,
6    purpose,
7    signing::{SignKeyPair, SignPublicKey, SignatureError, Signed},
8};
9use litl::impl_debug_as_litl;
10use serde_derive::{Deserialize, Serialize};
11use thiserror::Error;
12
13use crate::{
14    causal_set::{CausalSet, CausalSetItem, OptimisticCausalSetFrontier},
15    UpdateSource, KEEP_UNKNOWN,
16};
17
18purpose!(SetInsert);
19
20#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
21#[serde(rename="Telepathy/SetID")]
22pub struct SetID(SignPublicKey<SetInsert>);
23
24impl_debug_as_litl!(SetID);
25
26pub type SignedSetItem = Signed<SetItem, SetInsert>;
27
28#[derive(Serialize, Deserialize)]
29pub struct SetWriteAccess(SignKeyPair<SetInsert>);
30
31impl SetWriteAccess {
32    pub fn id(&self) -> SetID {
33        SetID(self.0.public())
34    }
35}
36
37#[derive(Default)]
38pub struct Sets {
39    sets: HashMap<SetID, SetState>,
40}
41
42impl CausalSetItem for SignedSetItem {
43    type ID = HashOf<SetItem>;
44
45    fn id(&self) -> Self::ID {
46        HashOf::hash(&self.verified)
47    }
48
49    fn prev(&self) -> HashSet<&Self::ID> {
50        self.prev.iter().collect()
51    }
52}
53
54#[derive(Default)]
55struct SetState {
56    priority: u8,
57    items: CausalSet<SignedSetItem>,
58    listeners: Listeners<(SetInsertMessage, UpdateSource)>,
59}
60
61#[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
62pub struct SetItem {
63    #[serde(with = "serde_bytes")]
64    pub data: Vec<u8>,
65    pub prev: BTreeSet<HashOf<SetItem>>,
66}
67
68impl_debug_as_litl!(SetItem);
69
70#[derive(Clone, Debug, Serialize, Deserialize)]
71pub struct SetInsertMessage {
72    set_id: SetID,
73    pub new_items: HashSet<SignedSetItem>,
74}
75
76#[derive(Clone, Default, Debug, Serialize, Deserialize)]
77pub struct SetKnownState {
78    pub priority: u8,
79    pub frontier: OptimisticCausalSetFrontier<SignedSetItem>,
80}
81
82impl SetKnownState {
83    pub fn update_optimistically(&mut self, insert: SetInsertMessage, underlying_set: Option<&CausalSet<SignedSetItem>>) {
84        self.frontier.insert_items(insert.new_items, underlying_set)
85    }
86}
87
88#[derive(Error, Debug)]
89pub enum SetError {
90    #[error(transparent)]
91    InvalidSignature(#[from] SignatureError),
92}
93
94impl Sets {
95    pub fn create_set(&mut self) -> SetWriteAccess {
96        let keypair = SignKeyPair::new_random();
97
98        match self.sets.entry(SetID(keypair.public())) {
99            std::collections::hash_map::Entry::Occupied(_) => {
100                unreachable!("Should never have existing set for new keypair")
101            }
102            std::collections::hash_map::Entry::Vacant(entry) => entry.insert(SetState::default()),
103        };
104
105        SetWriteAccess(keypair)
106    }
107
108    // TODO(correctness): Somehow make sure only one consumer can insert
109    pub fn insert<P: IntoIterator<Item = HashOf<SetItem>>>(
110        &mut self,
111        access: &SetWriteAccess,
112        data: Vec<u8>,
113        prev: P,
114    ) -> Result<HashOf<SetItem>, SetError> {
115        self.sets
116            .get(&access.id())
117            .expect("Set should be created or loaded when appending");
118
119        let set_item = SetItem {
120            data,
121            prev: prev.into_iter().collect(),
122        };
123
124        let set_item_hash = HashOf::hash(&set_item);
125
126        self.accept_insert(
127            &SetInsertMessage {
128                set_id: access.id(),
129                new_items: [access.0.sign(set_item)].into_iter().collect(),
130            },
131            UpdateSource::CreatedLocally,
132        )
133        .map(|_| set_item_hash)
134    }
135
136    pub fn current_items(&self, set_id: &SetID) -> Option<&CausalSet<SignedSetItem>> {
137        self.sets.get(set_id).map(|set_state| {
138            &set_state
139                .items
140
141        })
142    }
143
144    pub fn accept_insert(
145        &mut self,
146        insert: &SetInsertMessage,
147        source: UpdateSource,
148    ) -> Result<AcceptInsertResult, SetError> {
149        let set = if KEEP_UNKNOWN {
150            self.sets.entry(insert.set_id).or_default()
151        } else if let Some(set) = self.sets.get_mut(&insert.set_id) {
152            set
153        } else {
154            // ignore unknown set append
155            return Ok(AcceptInsertResult::AllConnected);
156        };
157
158        let new_connected_items: HashSet<SignedSetItem> = insert
159            .new_items
160            .iter()
161            .flat_map(|new_signed_item| set.items.insert(new_signed_item.clone()))
162            .collect();
163
164        let all_inserts_connected = new_connected_items.is_superset(&insert.new_items);
165
166        set.listeners.emit((
167            SetInsertMessage {
168                new_items: new_connected_items,
169                set_id: insert.set_id,
170            },
171            source,
172        ));
173
174        if all_inserts_connected {
175            Ok(AcceptInsertResult::AllConnected)
176        } else {
177            Ok(AcceptInsertResult::HasDisconnected)
178        }
179    }
180
181    pub(crate) fn get_inserts_since(
182        &self,
183        set_id: &SetID,
184        known_state: &mut SetKnownState,
185    ) -> Option<SetInsertMessage> {
186        self.sets.get(set_id).and_then(|set| {
187            let (new_items, _) = set
188                .items
189                .items_after(known_state.frontier.resolve(&set.items));
190            if new_items.is_empty() {
191                None
192            } else {
193                Some(SetInsertMessage {
194                    new_items: new_items.into_iter().collect(),
195                    set_id: *set_id,
196                })
197            }
198        })
199    }
200
201    pub fn add_listener(
202        &mut self,
203        set_id: SetID,
204        listener: Box<dyn Listener<(SetInsertMessage, UpdateSource)>>,
205    ) {
206        let set = self.sets.entry(set_id).or_default();
207
208        set.listeners.add_with_initial_msg(
209            listener,
210            if !set.items.is_empty() {
211                Some((
212                    SetInsertMessage {
213                        set_id,
214                        new_items: set.items.values().cloned().collect(),
215                    },
216                    UpdateSource::CurrentState,
217                ))
218            } else {
219                None
220            },
221        )
222    }
223
224    pub(crate) fn all_set_ids(&self) -> impl Iterator<Item = &SetID> {
225        self.sets.keys()
226    }
227
228    pub(crate) fn known_state(&self, set_id: &SetID) -> Option<SetKnownState> {
229        self.sets.get(set_id).map(|set| SetKnownState {
230            priority: set.priority,
231            frontier: set.items.as_optimistic_frontier()
232        })
233    }
234}
235
236pub enum AcceptInsertResult {
237    AllConnected,
238    HasDisconnected,
239}