1use std::collections::{BTreeSet, HashMap, HashSet};
2
3use audi::{Listener, Listeners};
4use conundrum::{
5 hashing::HashOf,
6 purpose,
7 signing::{SignKeyPair, SignPublicKey, SignatureError, Signed},
8};
9use litl::impl_debug_as_litl;
10use serde_derive::{Deserialize, Serialize};
11use thiserror::Error;
12
13use crate::{
14 causal_set::{CausalSet, CausalSetItem, OptimisticCausalSetFrontier},
15 UpdateSource, KEEP_UNKNOWN,
16};
17
18purpose!(SetInsert);
19
20#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
21#[serde(rename="Telepathy/SetID")]
22pub struct SetID(SignPublicKey<SetInsert>);
23
24impl_debug_as_litl!(SetID);
25
26pub type SignedSetItem = Signed<SetItem, SetInsert>;
27
28#[derive(Serialize, Deserialize)]
29pub struct SetWriteAccess(SignKeyPair<SetInsert>);
30
31impl SetWriteAccess {
32 pub fn id(&self) -> SetID {
33 SetID(self.0.public())
34 }
35}
36
37#[derive(Default)]
38pub struct Sets {
39 sets: HashMap<SetID, SetState>,
40}
41
42impl CausalSetItem for SignedSetItem {
43 type ID = HashOf<SetItem>;
44
45 fn id(&self) -> Self::ID {
46 HashOf::hash(&self.verified)
47 }
48
49 fn prev(&self) -> HashSet<&Self::ID> {
50 self.prev.iter().collect()
51 }
52}
53
54#[derive(Default)]
55struct SetState {
56 priority: u8,
57 items: CausalSet<SignedSetItem>,
58 listeners: Listeners<(SetInsertMessage, UpdateSource)>,
59}
60
61#[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
62pub struct SetItem {
63 #[serde(with = "serde_bytes")]
64 pub data: Vec<u8>,
65 pub prev: BTreeSet<HashOf<SetItem>>,
66}
67
68impl_debug_as_litl!(SetItem);
69
70#[derive(Clone, Debug, Serialize, Deserialize)]
71pub struct SetInsertMessage {
72 set_id: SetID,
73 pub new_items: HashSet<SignedSetItem>,
74}
75
76#[derive(Clone, Default, Debug, Serialize, Deserialize)]
77pub struct SetKnownState {
78 pub priority: u8,
79 pub frontier: OptimisticCausalSetFrontier<SignedSetItem>,
80}
81
82impl SetKnownState {
83 pub fn update_optimistically(&mut self, insert: SetInsertMessage, underlying_set: Option<&CausalSet<SignedSetItem>>) {
84 self.frontier.insert_items(insert.new_items, underlying_set)
85 }
86}
87
88#[derive(Error, Debug)]
89pub enum SetError {
90 #[error(transparent)]
91 InvalidSignature(#[from] SignatureError),
92}
93
94impl Sets {
95 pub fn create_set(&mut self) -> SetWriteAccess {
96 let keypair = SignKeyPair::new_random();
97
98 match self.sets.entry(SetID(keypair.public())) {
99 std::collections::hash_map::Entry::Occupied(_) => {
100 unreachable!("Should never have existing set for new keypair")
101 }
102 std::collections::hash_map::Entry::Vacant(entry) => entry.insert(SetState::default()),
103 };
104
105 SetWriteAccess(keypair)
106 }
107
108 pub fn insert<P: IntoIterator<Item = HashOf<SetItem>>>(
110 &mut self,
111 access: &SetWriteAccess,
112 data: Vec<u8>,
113 prev: P,
114 ) -> Result<HashOf<SetItem>, SetError> {
115 self.sets
116 .get(&access.id())
117 .expect("Set should be created or loaded when appending");
118
119 let set_item = SetItem {
120 data,
121 prev: prev.into_iter().collect(),
122 };
123
124 let set_item_hash = HashOf::hash(&set_item);
125
126 self.accept_insert(
127 &SetInsertMessage {
128 set_id: access.id(),
129 new_items: [access.0.sign(set_item)].into_iter().collect(),
130 },
131 UpdateSource::CreatedLocally,
132 )
133 .map(|_| set_item_hash)
134 }
135
136 pub fn current_items(&self, set_id: &SetID) -> Option<&CausalSet<SignedSetItem>> {
137 self.sets.get(set_id).map(|set_state| {
138 &set_state
139 .items
140
141 })
142 }
143
144 pub fn accept_insert(
145 &mut self,
146 insert: &SetInsertMessage,
147 source: UpdateSource,
148 ) -> Result<AcceptInsertResult, SetError> {
149 let set = if KEEP_UNKNOWN {
150 self.sets.entry(insert.set_id).or_default()
151 } else if let Some(set) = self.sets.get_mut(&insert.set_id) {
152 set
153 } else {
154 return Ok(AcceptInsertResult::AllConnected);
156 };
157
158 let new_connected_items: HashSet<SignedSetItem> = insert
159 .new_items
160 .iter()
161 .flat_map(|new_signed_item| set.items.insert(new_signed_item.clone()))
162 .collect();
163
164 let all_inserts_connected = new_connected_items.is_superset(&insert.new_items);
165
166 set.listeners.emit((
167 SetInsertMessage {
168 new_items: new_connected_items,
169 set_id: insert.set_id,
170 },
171 source,
172 ));
173
174 if all_inserts_connected {
175 Ok(AcceptInsertResult::AllConnected)
176 } else {
177 Ok(AcceptInsertResult::HasDisconnected)
178 }
179 }
180
181 pub(crate) fn get_inserts_since(
182 &self,
183 set_id: &SetID,
184 known_state: &mut SetKnownState,
185 ) -> Option<SetInsertMessage> {
186 self.sets.get(set_id).and_then(|set| {
187 let (new_items, _) = set
188 .items
189 .items_after(known_state.frontier.resolve(&set.items));
190 if new_items.is_empty() {
191 None
192 } else {
193 Some(SetInsertMessage {
194 new_items: new_items.into_iter().collect(),
195 set_id: *set_id,
196 })
197 }
198 })
199 }
200
201 pub fn add_listener(
202 &mut self,
203 set_id: SetID,
204 listener: Box<dyn Listener<(SetInsertMessage, UpdateSource)>>,
205 ) {
206 let set = self.sets.entry(set_id).or_default();
207
208 set.listeners.add_with_initial_msg(
209 listener,
210 if !set.items.is_empty() {
211 Some((
212 SetInsertMessage {
213 set_id,
214 new_items: set.items.values().cloned().collect(),
215 },
216 UpdateSource::CurrentState,
217 ))
218 } else {
219 None
220 },
221 )
222 }
223
224 pub(crate) fn all_set_ids(&self) -> impl Iterator<Item = &SetID> {
225 self.sets.keys()
226 }
227
228 pub(crate) fn known_state(&self, set_id: &SetID) -> Option<SetKnownState> {
229 self.sets.get(set_id).map(|set| SetKnownState {
230 priority: set.priority,
231 frontier: set.items.as_optimistic_frontier()
232 })
233 }
234}
235
236pub enum AcceptInsertResult {
237 AllConnected,
238 HasDisconnected,
239}