endr/
log.rs

1use futures::{future, stream, FutureExt, StreamExt};
2use litl::{impl_debug_as_litl, impl_nested_tagged_data_serde, NestedTaggedData};
3use ridl::{
4    hashing::{HashOf, StrongHash, StrongHasher},
5    signing::{SignatureError, Signed, SignerID, SignerSecret},
6};
7use serde::Serialize;
8use serde_derive::{Deserialize, Serialize};
9use thiserror::Error;
10use tracing::{debug, error, info};
11
12use crate::{
13    telepathic::{
14        ApplyDiffErrorFor, ApplyDiffResult, ApplyDiffSuccess, Telepathic, TelepathicDiff,
15    },
16    ObjectID, StorageBackend,
17};
18
19#[derive(Clone, Serialize, Deserialize)]
20pub struct LogHeader {
21    pub appender: SignerID,
22    pub meta: Option<litl::Val>,
23}
24
25impl_debug_as_litl!(LogHeader);
26
27#[derive(Debug)]
28pub struct LogState {
29    pub id: LogID,
30    pub header: Option<LogHeader>,
31    pub entries: Vec<litl::Val>,
32    pub last_hash: Option<Signed<StrongHash>>,
33    hasher: StrongHasher,
34}
35
36impl LogState {
37    pub(crate) fn new_empty(id: LogID) -> Self {
38        Self {
39            id,
40            header: None,
41            entries: Vec::new(),
42            last_hash: None,
43            hasher: StrongHasher::default(),
44        }
45    }
46
47    pub(crate) fn new<M: Serialize>(meta: Option<M>) -> (Self, LogWriteAccess) {
48        let signer_secret = SignerSecret::new_random();
49
50        let header = LogHeader {
51            appender: signer_secret.pub_id(),
52            meta: meta.map(|meta| litl::to_val(&meta).unwrap()),
53        };
54
55        let id = LogID(HashOf::hash(&header));
56
57        (
58            Self {
59                id,
60                header: Some(header),
61                entries: Vec::new(),
62                last_hash: None,
63                hasher: StrongHasher::default(),
64            },
65            LogWriteAccess(signer_secret),
66        )
67    }
68
69    pub(crate) fn diff_for_new_entry(
70        &self,
71        entry: litl::Val,
72        write_access: &LogWriteAccess,
73    ) -> LogDiff {
74        let mut hasher = self.hasher.clone();
75        hasher.update(&litl::to_vec_canonical(&entry).unwrap());
76
77        LogDiff {
78            id: self.id,
79            header: None,
80            after: self.entries.len(),
81            new_entries: vec![entry],
82            new_hash: Some(write_access.0.sign(hasher.finalize())),
83        }
84    }
85}
86
87#[derive(Copy, Clone, PartialEq, Eq, Hash)]
88pub struct LogID(pub HashOf<LogHeader>);
89
90impl NestedTaggedData for LogID {
91    const TAG: &'static str = "log";
92
93    type Inner = HashOf<LogHeader>;
94
95    fn as_inner(&self) -> &Self::Inner {
96        &self.0
97    }
98
99    fn from_inner(inner: Self::Inner) -> Self
100    where
101        Self: Sized,
102    {
103        LogID(inner)
104    }
105}
106
107impl_nested_tagged_data_serde!(LogID);
108impl_debug_as_litl!(LogID);
109
110pub struct LogWriteAccess(pub(crate) SignerSecret);
111
112impl NestedTaggedData for LogWriteAccess {
113    const TAG: &'static str = "logWriteAccess";
114
115    type Inner = SignerSecret;
116
117    fn as_inner(&self) -> &Self::Inner {
118        &self.0
119    }
120
121    fn from_inner(inner: Self::Inner) -> Self
122    where
123        Self: Sized,
124    {
125        LogWriteAccess(inner)
126    }
127}
128
129impl_nested_tagged_data_serde!(LogWriteAccess);
130impl_debug_as_litl!(LogWriteAccess);
131
132#[derive(Clone, Serialize, Deserialize, Debug)]
133pub struct LogDiff {
134    pub id: LogID,
135    pub header: Option<LogHeader>,
136    pub after: usize,
137    pub new_entries: Vec<litl::Val>,
138    pub new_hash: Option<Signed<StrongHash>>,
139}
140
141impl TelepathicDiff for LogDiff {
142    type ID = LogID;
143
144    fn id(&self) -> Self::ID {
145        self.id
146    }
147}
148
149pub type LogStateInfo = usize;
150
151#[derive(Serialize, Deserialize)]
152pub struct StorageHashAndLen {
153    pub hash: Signed<StrongHash>,
154    pub len: usize,
155}
156
157impl Telepathic for LogState {
158    type ID = LogID;
159    type WriteAccess = LogWriteAccess;
160    type StateInfo = LogStateInfo;
161    type Diff = LogDiff;
162    type Error = LogError;
163
164    fn id(&self) -> Self::ID {
165        self.id
166    }
167
168    fn try_apply_diff(
169        &mut self,
170        diff: LogDiff,
171    ) -> ApplyDiffResult<Self::StateInfo, Self::ID, Self::Diff, Self::Error> {
172        debug_assert_eq!(diff.id, self.id);
173
174        let (header, got_header_first_time) = match (&mut self.header, &diff.header) {
175            (None, None) => {
176                return Ok(None)
177            }
178            (own @ None, Some(diff_header)) => {
179                if HashOf::hash(diff_header) != self.id.0 {
180                    return Err(LogError::InvalidHeaderHash.into());
181                }
182                *own = Some(diff_header.clone());
183                (own.as_ref().unwrap(), true)
184            }
185            (Some(own_header), _) => (&*own_header, false),
186        };
187
188        if diff.after == 0 && diff.new_entries.is_empty() {
189            if self.entries.is_empty() {
190                return Ok(Some(ApplyDiffSuccess {
191                    new_state_info: 0,
192                    effective_diff: LogDiff {
193                        id: diff.id,
194                        header: if got_header_first_time {
195                            self.header.clone()
196                        } else {
197                            None
198                        },
199                        after: 0,
200                        new_entries: vec![],
201                        new_hash: None,
202                    },
203                }));
204            } else {
205                return Err(ApplyDiffErrorFor::InvalidKnownStateAssumption(
206                    ObjectID::Log(self.id),
207                    "Got initial diff for non-empty log".to_owned(),
208                ));
209            }
210        }
211
212        let new_hash = diff.new_hash.as_ref().expect("Expected new hash on append");
213        new_hash
214            .ensure_signed_by(&header.appender)
215            .map_err(LogError::InvalidSignature)?;
216
217        if diff.after > self.entries.len() {
218            return Err(ApplyDiffErrorFor::InvalidKnownStateAssumption(
219                ObjectID::Log(self.id),
220                "Got diff later than current log length".to_owned(),
221            ));
222        }
223
224        if diff.after + diff.new_entries.len() <= self.entries.len() {
225            info!(
226                after = diff.after,
227                new_entries = diff.new_entries.len(),
228                self_len = self.entries.len(),
229                "Received completely redundant log update"
230            );
231            return Ok(None)
232        }
233
234        let overlap = self.entries.len() - diff.after;
235        let data_len_before = self.entries.len();
236        let effective_new_entries = &diff.new_entries[overlap..];
237
238        let mut new_hasher = self.hasher.clone();
239        for entry in &effective_new_entries[..] {
240            new_hasher.update(&litl::to_vec_canonical(&entry).unwrap());
241        }
242
243        if new_hasher.finalize() == new_hash.attested {
244            self.entries.extend(effective_new_entries.to_vec());
245            self.last_hash = Some(new_hash.clone());
246            self.hasher = new_hasher;
247
248            let effective_diff = if overlap > 0 {
249                info!(
250                    overlap = overlap,
251                    after = diff.after,
252                    self_len = data_len_before,
253                    new_entries = diff.new_entries.len(),
254                    effective_new_entries = effective_new_entries.len(),
255                    "Received redundant log update"
256                );
257
258                LogDiff {
259                    id: diff.id,
260                    header: if got_header_first_time {
261                        self.header.clone()
262                    } else {
263                        None
264                    },
265                    after: data_len_before,
266                    new_entries: effective_new_entries.to_vec(),
267                    new_hash: diff.new_hash,
268                }
269            } else {
270                LogDiff {
271                    header: if got_header_first_time {
272                        self.header.clone()
273                    } else {
274                        None
275                    },
276                    ..diff
277                }
278            };
279
280            Ok(Some(ApplyDiffSuccess {
281                new_state_info: self.entries.len(),
282                effective_diff,
283            }))
284        } else {
285            Err(ApplyDiffErrorFor::Other(LogError::InvalidHash))
286        }
287    }
288
289    fn diff_since(&self, state_info: Option<&Self::StateInfo>) -> Option<Self::Diff> {
290        let (has_header, state_len) = match state_info {
291            None => (false, 0),
292            Some(len) => (true, *len as usize),
293        };
294
295        if state_len >= self.entries.len() && has_header {
296            None
297        } else {
298            Some(LogDiff {
299                id: self.id,
300                header: if has_header {
301                    None
302                } else {
303                    self.header.clone()
304                },
305                after: state_len,
306                new_entries: self.entries[state_len..].to_vec(),
307                new_hash: self.last_hash.clone(),
308            })
309        }
310    }
311
312    fn state_info(&self) -> Option<Self::StateInfo> {
313        if self.header.is_none() {
314            None
315        } else {
316            Some(self.entries.len())
317        }
318    }
319
320    fn load(
321        id: LogID,
322        storage: Box<dyn StorageBackend>,
323    ) -> std::pin::Pin<Box<dyn futures::Stream<Item = Self::Diff>>> {
324        let key = id.to_string();
325
326        let header_stream = {
327            let key = key.clone();
328            stream::once(storage.get_key(&format!("header_{}", key))).filter_map(
329                move |maybe_bytes| {
330                    future::ready(maybe_bytes.and_then(|bytes| {
331                        litl::from_slice(&bytes)
332                            .map_err(|err| {
333                                error!(err = ?err, id = ?id, "Failed to read loaded log header");
334                                err
335                            })
336                            .ok()
337                            .map(|header| LogDiff {
338                                id,
339                                header: Some(header),
340                                after: 0,
341                                new_entries: vec![],
342                                new_hash: None,
343                            })
344                    }))
345                },
346            )
347        };
348
349        let all_chunks_stream = {
350            stream::once(storage.get_key(&format!("hash_and_len_{}", key)))
351                .filter_map(move |maybe_hash_and_len_bytes| {
352                    future::ready(maybe_hash_and_len_bytes.and_then(|hash_and_len_bytes| {
353                        litl::from_slice::<StorageHashAndLen>(&hash_and_len_bytes).map_err(|err| {
354                            error!(err = ?err, id = ?id, "Failed to read loaded log hash and len");
355                            err
356                        }).ok()
357                    }))
358                })
359                .filter_map(move |hash_and_len| {
360                    let storage = storage.clone_ref();
361                    let key = key.clone();
362                    async move {
363                        let all_items = storage
364                            .get_stream(&key)
365                            .map(|data|litl::from_slice::<litl::Val>(&data).unwrap())
366                            .collect::<Vec<_>>()
367                            .await
368                           ;
369
370                        if all_items.len() < hash_and_len.len {
371                            error!(id = ?id, "Storage backend returned less data than last hash");
372                            None
373                        } else {
374                            Some(LogDiff {
375                                id,
376                                header: None,
377                                after: 0,
378                                new_entries: all_items[0..hash_and_len.len].to_vec(),
379                                new_hash: Some(hash_and_len.hash),
380                            })
381                        }
382                    }
383                })
384        };
385
386        header_stream.chain(all_chunks_stream).boxed_local()
387    }
388
389    fn store(
390        effective_diff: LogDiff,
391        storage: Box<dyn StorageBackend>,
392    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()>>> {
393        let key = effective_diff.id.to_string();
394        async move {
395            if let Some(header) = &effective_diff.header {
396                storage
397                    .set_key(&format!("header_{}", key), litl::to_vec(header).unwrap())
398                    .await;
399            }
400
401            if let Some(new_hash) = &effective_diff.new_hash {
402                if effective_diff.new_entries.is_empty() {
403                    return;
404                }
405                let new_len = effective_diff.after + effective_diff.new_entries.len();
406                for entry in effective_diff.new_entries {
407                    storage
408                        .append_to_stream(&key, litl::to_vec(&entry).unwrap(), None)
409                        .await;
410                }
411
412                storage
413                    .set_key(
414                        &format!("hash_and_len_{}", key),
415                        litl::to_vec(&StorageHashAndLen {
416                            hash: new_hash.clone(),
417                            len: new_len,
418                        })
419                        .unwrap(),
420                    )
421                    .await;
422            }
423        }
424        .boxed_local()
425    }
426}
427
428#[derive(Error, Debug)]
429pub enum LogError {
430    #[error("Invalid header hash")]
431    InvalidHeaderHash,
432    #[error("Invalid hash after append")]
433    InvalidHash,
434    #[error(transparent)]
435    InvalidSignature(#[from] SignatureError),
436    #[error("Append out of order")]
437    AppendOutOfOrder,
438}