endr/
set.rs

1use std::collections::{BTreeSet, HashSet};
2
3use futures::{future, stream, FutureExt, StreamExt, TryStreamExt};
4use litl::{impl_debug_as_litl, impl_nested_tagged_data_serde, NestedTaggedData};
5use ridl::{
6    hashing::HashOf,
7    signing::{SignatureError, Signed, SignerID, SignerSecret},
8};
9use serde::Serialize;
10use serde_derive::{Deserialize, Serialize};
11use thiserror::Error;
12use tracing::{error, debug, warn};
13
14use crate::{
15    causal_set::{CausalSet, CausalSetItem, OptimisticCausalSetFrontier},
16    telepathic::{
17        ApplyDiffErrorFor, ApplyDiffResult, ApplyDiffSuccess, Telepathic, TelepathicDiff,
18    },
19    StorageBackend, ObjectID,
20};
21
22#[derive(Clone, Serialize, Deserialize)]
23pub struct SetHeader {
24    pub inserter: SignerID,
25    pub meta: Option<litl::Val>,
26}
27
28impl_debug_as_litl!(SetHeader);
29
30#[derive(Debug)]
31pub struct SetState {
32    pub id: SetID,
33    pub header: Option<SetHeader>,
34    pub items: CausalSet<Signed<SetItem>>,
35}
36
37#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
38pub struct SetItem {
39    pub data: litl::Val,
40    pub prev: BTreeSet<SetItemID>,
41}
42
43#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
44pub struct SetItemID(HashOf<SetItem>);
45
46impl SetItemID {
47    pub fn test_random() -> Self {
48        SetItemID(HashOf::hash(&SetItem {
49            data: litl::to_val(&rand::random::<u32>()).unwrap(),
50            prev: BTreeSet::new(),
51        }))
52    }
53}
54
55impl NestedTaggedData for SetItemID {
56    const TAG: &'static str = "setItemID";
57
58    type Inner = HashOf<SetItem>;
59
60    fn as_inner(&self) -> &Self::Inner {
61        &self.0
62    }
63
64    fn from_inner(inner: Self::Inner) -> Self
65    where
66        Self: Sized,
67    {
68        SetItemID(inner)
69    }
70}
71
72impl_nested_tagged_data_serde!(SetItemID);
73impl_debug_as_litl!(SetItemID);
74
75impl SetItem {
76    pub fn new<D: Serialize, I: IntoIterator<Item = SetItemID>>(data: D, prev: I) -> Self {
77        SetItem {
78            data: litl::to_val(&data).unwrap(),
79            prev: prev.into_iter().collect(),
80        }
81    }
82
83    pub fn id(&self) -> SetItemID {
84        SetItemID(HashOf::hash(self))
85    }
86}
87
88impl CausalSetItem for Signed<SetItem> {
89    type ID = SetItemID;
90
91    fn id(&self) -> Self::ID {
92        self.attested.id()
93    }
94
95    fn prev(&self) -> HashSet<&Self::ID> {
96        self.prev.iter().collect()
97    }
98}
99
100impl SetState {
101    pub fn new_empty(id: SetID) -> Self {
102        Self {
103            id,
104            header: None,
105            items: CausalSet::new(),
106        }
107    }
108
109    pub fn new<M: Serialize>(meta: Option<M>) -> (Self, SetWriteAccess) {
110        let signer_secret = SignerSecret::new_random();
111
112        let header = SetHeader {
113            inserter: signer_secret.pub_id(),
114            meta: meta.map(|meta| litl::to_val(&meta).unwrap()),
115        };
116
117        let id = SetID(HashOf::hash(&header));
118
119        (
120            Self {
121                id,
122                header: Some(header),
123                items: CausalSet::new(),
124            },
125            SetWriteAccess(signer_secret),
126        )
127    }
128}
129
130#[derive(Copy, Clone, PartialEq, Eq, Hash)]
131pub struct SetID(HashOf<SetHeader>);
132
133impl NestedTaggedData for SetID {
134    const TAG: &'static str = "set";
135
136    type Inner = HashOf<SetHeader>;
137
138    fn as_inner(&self) -> &Self::Inner {
139        &self.0
140    }
141
142    fn from_inner(inner: Self::Inner) -> Self
143    where
144        Self: Sized,
145    {
146        SetID(inner)
147    }
148}
149
150impl_nested_tagged_data_serde!(SetID);
151
152impl_debug_as_litl!(SetID);
153
154pub struct SetWriteAccess(pub(crate) SignerSecret);
155
156impl NestedTaggedData for SetWriteAccess {
157    const TAG: &'static str = "setWriteAccess";
158
159    type Inner = SignerSecret;
160
161    fn as_inner(&self) -> &Self::Inner {
162        &self.0
163    }
164
165    fn from_inner(inner: Self::Inner) -> Self
166    where
167        Self: Sized,
168    {
169        SetWriteAccess(inner)
170    }
171}
172
173impl_nested_tagged_data_serde!(SetWriteAccess);
174impl_debug_as_litl!(SetWriteAccess);
175
176#[derive(Clone, Serialize, Deserialize, Debug)]
177pub struct SetDiff {
178    pub id: SetID,
179    pub header: Option<SetHeader>,
180    pub new_items: Vec<Signed<SetItem>>,
181}
182
183impl TelepathicDiff for SetDiff {
184    type ID = SetID;
185
186    fn id(&self) -> SetID {
187        self.id
188    }
189}
190
191#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
192pub struct SetStateInfo {
193    frontier: OptimisticCausalSetFrontier<Signed<SetItem>>,
194    has_header: bool,
195}
196
197impl_debug_as_litl!(SetStateInfo);
198
199impl Telepathic for SetState {
200    type ID = SetID;
201    type WriteAccess = SetWriteAccess;
202    type StateInfo = SetStateInfo;
203    type Diff = SetDiff;
204    type Error = SetError;
205
206    fn id(&self) -> Self::ID {
207        self.id
208    }
209
210    fn try_apply_diff(
211        &mut self,
212        diff: Self::Diff,
213    ) -> ApplyDiffResult<Self::StateInfo, Self::ID, Self::Diff, Self::Error> {
214        let (header, got_header_first_time) = match (&mut self.header, &diff.header) {
215            (None, None) => {
216                //return Err(ApplyDiffErrorFor::InvalidKnownStateAssumption(ObjectID::Set(self.id), "Didn't get set header even though we also don't have it".to_owned()));
217                return Ok(None)
218            }
219            (own @ None, Some(diff_header)) => {
220                if HashOf::hash(diff_header) != self.id.0 {
221                    return Err(SetError::InvalidHeaderHash.into());
222                }
223                *own = Some(diff_header.clone());
224                (own.as_ref().unwrap(), true)
225            }
226            (Some(own_header), _) => (&*own_header, false),
227        };
228
229        let mut redundant_items = 0;
230
231        let mut actually_new_items = Vec::new();
232
233        for new_item in &diff.new_items {
234            // performance shortcut: ignore items that are already in the set, potentially ignoring malformed incoming ones
235            if self.items.contains_key(&new_item.id()) {
236                redundant_items += 1;
237                continue;
238            }
239            new_item
240                .ensure_signed_by(&header.inserter)
241                .map_err(SetError::InvalidSignature)?;
242
243            self.items.insert(new_item.clone());
244            actually_new_items.push(new_item.clone());
245        }
246
247        if redundant_items > 0 {
248            warn!("Got {} redundant items in applied set diff", redundant_items)
249        }
250
251        let effective_diff = SetDiff {
252            id: self.id,
253            header: if got_header_first_time { Some(header.clone()) } else { None },
254            new_items: actually_new_items,
255        };
256        Ok(Some(ApplyDiffSuccess {
257            new_state_info: SetStateInfo {
258                frontier: self.items.as_optimistic_frontier(),
259                has_header: self.header.is_some(),
260            },
261            effective_diff,
262        }))
263    }
264
265    fn state_info(&self) -> Option<Self::StateInfo> {
266        let frontier = self.items.as_optimistic_frontier();
267        if self.header.is_none() {
268            None
269        } else {
270            Some(SetStateInfo {
271                frontier,
272                has_header: true,
273            })
274        }
275    }
276
277    fn diff_since(&self, state_info: Option<&Self::StateInfo>) -> Option<Self::Diff> {
278        if let Some(mut state_info) = state_info.cloned() {
279            let frontier = state_info.frontier.resolve(&self.items);
280            let (new_items, _) = self.items.items_after(frontier);
281
282            if new_items.is_empty() && state_info.has_header {
283                None
284            } else {
285                Some(SetDiff {
286                    id: self.id,
287                    header: self.header.clone(),
288                    new_items,
289                })
290            }
291        } else {
292            Some(SetDiff {
293                id: self.id,
294                header: self.header.clone(),
295                new_items: self.items.values_ordered(),
296            })
297        }
298    }
299
300    fn load(
301        id: SetID,
302        storage: Box<dyn StorageBackend>,
303    ) -> std::pin::Pin<Box<dyn futures::Stream<Item = Self::Diff>>> {
304        let key = id.to_string();
305
306        let header_stream = stream::once(storage.get_key(&format!("header_{}", key))).filter_map(
307            move |maybe_header_bytes| {
308                future::ready(maybe_header_bytes.and_then(|header_bytes| {
309                    litl::from_slice(&header_bytes)
310                        .map_err(|err| {
311                            error!(err = ?err, id = ?id, "Failed to read loaded set header");
312                            err
313                        })
314                        .ok()
315                        .map(|header| SetDiff {
316                            id,
317                            header: Some(header),
318                            new_items: vec![],
319                        })
320                }))
321            },
322        );
323
324        let items_stream =
325            litl::read_newln_sep_stream(storage.get_stream(&key).map(Ok).into_async_read())
326                .filter_map(move |read_result| {
327                    future::ready(match read_result {
328                        Ok(item) => Some(item),
329                        Err(err) => {
330                            error!(err = ?err, id = ?id, "Failed to read loaded set item");
331                            None
332                        }
333                    })
334                })
335                .ready_chunks(1000)
336                // .map(|item| vec![item])
337                .map(move |items| SetDiff {
338                    id,
339                    header: None,
340                    new_items: items,
341                });
342
343        header_stream.chain(items_stream).boxed_local()
344    }
345
346    fn store(
347        effective_diff: Self::Diff,
348        storage: Box<dyn StorageBackend>,
349    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()>>> {
350        let key = effective_diff.id.to_string();
351        async move {
352            if let Some(header) = &effective_diff.header {
353                storage
354                    .set_key(&format!("header_{}", key), litl::to_vec(&header).unwrap())
355                    .await;
356            }
357
358            if !effective_diff.new_items.is_empty() {
359                storage
360                    .append_to_stream(
361                        &key,
362                        litl::to_newln_sep_vec(effective_diff.new_items.iter()).unwrap(),
363                        None,
364                    )
365                    .await;
366            }
367        }
368        .boxed_local()
369    }
370}
371
372#[derive(Error, Debug)]
373pub enum SetError {
374    #[error("Invalid header hash")]
375    InvalidHeaderHash,
376    #[error(transparent)]
377    InvalidSignature(#[from] SignatureError),
378}