1use std::collections::{BTreeSet, HashSet};
2
3use futures::{future, stream, FutureExt, StreamExt, TryStreamExt};
4use litl::{impl_debug_as_litl, impl_nested_tagged_data_serde, NestedTaggedData};
5use ridl::{
6 hashing::HashOf,
7 signing::{SignatureError, Signed, SignerID, SignerSecret},
8};
9use serde::Serialize;
10use serde_derive::{Deserialize, Serialize};
11use thiserror::Error;
12use tracing::{error, debug, warn};
13
14use crate::{
15 causal_set::{CausalSet, CausalSetItem, OptimisticCausalSetFrontier},
16 telepathic::{
17 ApplyDiffErrorFor, ApplyDiffResult, ApplyDiffSuccess, Telepathic, TelepathicDiff,
18 },
19 StorageBackend, ObjectID,
20};
21
22#[derive(Clone, Serialize, Deserialize)]
23pub struct SetHeader {
24 pub inserter: SignerID,
25 pub meta: Option<litl::Val>,
26}
27
28impl_debug_as_litl!(SetHeader);
29
30#[derive(Debug)]
31pub struct SetState {
32 pub id: SetID,
33 pub header: Option<SetHeader>,
34 pub items: CausalSet<Signed<SetItem>>,
35}
36
37#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
38pub struct SetItem {
39 pub data: litl::Val,
40 pub prev: BTreeSet<SetItemID>,
41}
42
43#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
44pub struct SetItemID(HashOf<SetItem>);
45
46impl SetItemID {
47 pub fn test_random() -> Self {
48 SetItemID(HashOf::hash(&SetItem {
49 data: litl::to_val(&rand::random::<u32>()).unwrap(),
50 prev: BTreeSet::new(),
51 }))
52 }
53}
54
55impl NestedTaggedData for SetItemID {
56 const TAG: &'static str = "setItemID";
57
58 type Inner = HashOf<SetItem>;
59
60 fn as_inner(&self) -> &Self::Inner {
61 &self.0
62 }
63
64 fn from_inner(inner: Self::Inner) -> Self
65 where
66 Self: Sized,
67 {
68 SetItemID(inner)
69 }
70}
71
72impl_nested_tagged_data_serde!(SetItemID);
73impl_debug_as_litl!(SetItemID);
74
75impl SetItem {
76 pub fn new<D: Serialize, I: IntoIterator<Item = SetItemID>>(data: D, prev: I) -> Self {
77 SetItem {
78 data: litl::to_val(&data).unwrap(),
79 prev: prev.into_iter().collect(),
80 }
81 }
82
83 pub fn id(&self) -> SetItemID {
84 SetItemID(HashOf::hash(self))
85 }
86}
87
88impl CausalSetItem for Signed<SetItem> {
89 type ID = SetItemID;
90
91 fn id(&self) -> Self::ID {
92 self.attested.id()
93 }
94
95 fn prev(&self) -> HashSet<&Self::ID> {
96 self.prev.iter().collect()
97 }
98}
99
100impl SetState {
101 pub fn new_empty(id: SetID) -> Self {
102 Self {
103 id,
104 header: None,
105 items: CausalSet::new(),
106 }
107 }
108
109 pub fn new<M: Serialize>(meta: Option<M>) -> (Self, SetWriteAccess) {
110 let signer_secret = SignerSecret::new_random();
111
112 let header = SetHeader {
113 inserter: signer_secret.pub_id(),
114 meta: meta.map(|meta| litl::to_val(&meta).unwrap()),
115 };
116
117 let id = SetID(HashOf::hash(&header));
118
119 (
120 Self {
121 id,
122 header: Some(header),
123 items: CausalSet::new(),
124 },
125 SetWriteAccess(signer_secret),
126 )
127 }
128}
129
130#[derive(Copy, Clone, PartialEq, Eq, Hash)]
131pub struct SetID(HashOf<SetHeader>);
132
133impl NestedTaggedData for SetID {
134 const TAG: &'static str = "set";
135
136 type Inner = HashOf<SetHeader>;
137
138 fn as_inner(&self) -> &Self::Inner {
139 &self.0
140 }
141
142 fn from_inner(inner: Self::Inner) -> Self
143 where
144 Self: Sized,
145 {
146 SetID(inner)
147 }
148}
149
150impl_nested_tagged_data_serde!(SetID);
151
152impl_debug_as_litl!(SetID);
153
154pub struct SetWriteAccess(pub(crate) SignerSecret);
155
156impl NestedTaggedData for SetWriteAccess {
157 const TAG: &'static str = "setWriteAccess";
158
159 type Inner = SignerSecret;
160
161 fn as_inner(&self) -> &Self::Inner {
162 &self.0
163 }
164
165 fn from_inner(inner: Self::Inner) -> Self
166 where
167 Self: Sized,
168 {
169 SetWriteAccess(inner)
170 }
171}
172
173impl_nested_tagged_data_serde!(SetWriteAccess);
174impl_debug_as_litl!(SetWriteAccess);
175
176#[derive(Clone, Serialize, Deserialize, Debug)]
177pub struct SetDiff {
178 pub id: SetID,
179 pub header: Option<SetHeader>,
180 pub new_items: Vec<Signed<SetItem>>,
181}
182
183impl TelepathicDiff for SetDiff {
184 type ID = SetID;
185
186 fn id(&self) -> SetID {
187 self.id
188 }
189}
190
191#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
192pub struct SetStateInfo {
193 frontier: OptimisticCausalSetFrontier<Signed<SetItem>>,
194 has_header: bool,
195}
196
197impl_debug_as_litl!(SetStateInfo);
198
199impl Telepathic for SetState {
200 type ID = SetID;
201 type WriteAccess = SetWriteAccess;
202 type StateInfo = SetStateInfo;
203 type Diff = SetDiff;
204 type Error = SetError;
205
206 fn id(&self) -> Self::ID {
207 self.id
208 }
209
210 fn try_apply_diff(
211 &mut self,
212 diff: Self::Diff,
213 ) -> ApplyDiffResult<Self::StateInfo, Self::ID, Self::Diff, Self::Error> {
214 let (header, got_header_first_time) = match (&mut self.header, &diff.header) {
215 (None, None) => {
216 return Ok(None)
218 }
219 (own @ None, Some(diff_header)) => {
220 if HashOf::hash(diff_header) != self.id.0 {
221 return Err(SetError::InvalidHeaderHash.into());
222 }
223 *own = Some(diff_header.clone());
224 (own.as_ref().unwrap(), true)
225 }
226 (Some(own_header), _) => (&*own_header, false),
227 };
228
229 let mut redundant_items = 0;
230
231 let mut actually_new_items = Vec::new();
232
233 for new_item in &diff.new_items {
234 if self.items.contains_key(&new_item.id()) {
236 redundant_items += 1;
237 continue;
238 }
239 new_item
240 .ensure_signed_by(&header.inserter)
241 .map_err(SetError::InvalidSignature)?;
242
243 self.items.insert(new_item.clone());
244 actually_new_items.push(new_item.clone());
245 }
246
247 if redundant_items > 0 {
248 warn!("Got {} redundant items in applied set diff", redundant_items)
249 }
250
251 let effective_diff = SetDiff {
252 id: self.id,
253 header: if got_header_first_time { Some(header.clone()) } else { None },
254 new_items: actually_new_items,
255 };
256 Ok(Some(ApplyDiffSuccess {
257 new_state_info: SetStateInfo {
258 frontier: self.items.as_optimistic_frontier(),
259 has_header: self.header.is_some(),
260 },
261 effective_diff,
262 }))
263 }
264
265 fn state_info(&self) -> Option<Self::StateInfo> {
266 let frontier = self.items.as_optimistic_frontier();
267 if self.header.is_none() {
268 None
269 } else {
270 Some(SetStateInfo {
271 frontier,
272 has_header: true,
273 })
274 }
275 }
276
277 fn diff_since(&self, state_info: Option<&Self::StateInfo>) -> Option<Self::Diff> {
278 if let Some(mut state_info) = state_info.cloned() {
279 let frontier = state_info.frontier.resolve(&self.items);
280 let (new_items, _) = self.items.items_after(frontier);
281
282 if new_items.is_empty() && state_info.has_header {
283 None
284 } else {
285 Some(SetDiff {
286 id: self.id,
287 header: self.header.clone(),
288 new_items,
289 })
290 }
291 } else {
292 Some(SetDiff {
293 id: self.id,
294 header: self.header.clone(),
295 new_items: self.items.values_ordered(),
296 })
297 }
298 }
299
300 fn load(
301 id: SetID,
302 storage: Box<dyn StorageBackend>,
303 ) -> std::pin::Pin<Box<dyn futures::Stream<Item = Self::Diff>>> {
304 let key = id.to_string();
305
306 let header_stream = stream::once(storage.get_key(&format!("header_{}", key))).filter_map(
307 move |maybe_header_bytes| {
308 future::ready(maybe_header_bytes.and_then(|header_bytes| {
309 litl::from_slice(&header_bytes)
310 .map_err(|err| {
311 error!(err = ?err, id = ?id, "Failed to read loaded set header");
312 err
313 })
314 .ok()
315 .map(|header| SetDiff {
316 id,
317 header: Some(header),
318 new_items: vec![],
319 })
320 }))
321 },
322 );
323
324 let items_stream =
325 litl::read_newln_sep_stream(storage.get_stream(&key).map(Ok).into_async_read())
326 .filter_map(move |read_result| {
327 future::ready(match read_result {
328 Ok(item) => Some(item),
329 Err(err) => {
330 error!(err = ?err, id = ?id, "Failed to read loaded set item");
331 None
332 }
333 })
334 })
335 .ready_chunks(1000)
336 .map(move |items| SetDiff {
338 id,
339 header: None,
340 new_items: items,
341 });
342
343 header_stream.chain(items_stream).boxed_local()
344 }
345
346 fn store(
347 effective_diff: Self::Diff,
348 storage: Box<dyn StorageBackend>,
349 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()>>> {
350 let key = effective_diff.id.to_string();
351 async move {
352 if let Some(header) = &effective_diff.header {
353 storage
354 .set_key(&format!("header_{}", key), litl::to_vec(&header).unwrap())
355 .await;
356 }
357
358 if !effective_diff.new_items.is_empty() {
359 storage
360 .append_to_stream(
361 &key,
362 litl::to_newln_sep_vec(effective_diff.new_items.iter()).unwrap(),
363 None,
364 )
365 .await;
366 }
367 }
368 .boxed_local()
369 }
370}
371
372#[derive(Error, Debug)]
373pub enum SetError {
374 #[error("Invalid header hash")]
375 InvalidHeaderHash,
376 #[error(transparent)]
377 InvalidSignature(#[from] SignatureError),
378}