1use futures::{future, stream, FutureExt, StreamExt};
2use litl::{impl_debug_as_litl, impl_nested_tagged_data_serde, NestedTaggedData};
3use ridl::{
4 hashing::{HashOf, StrongHash, StrongHasher},
5 signing::{SignatureError, Signed, SignerID, SignerSecret},
6};
7use serde::Serialize;
8use serde_derive::{Deserialize, Serialize};
9use thiserror::Error;
10use tracing::{debug, error, info};
11
12use crate::{
13 telepathic::{
14 ApplyDiffErrorFor, ApplyDiffResult, ApplyDiffSuccess, Telepathic, TelepathicDiff,
15 },
16 ObjectID, StorageBackend,
17};
18
19#[derive(Clone, Serialize, Deserialize)]
20pub struct LogHeader {
21 pub appender: SignerID,
22 pub meta: Option<litl::Val>,
23}
24
25impl_debug_as_litl!(LogHeader);
26
27#[derive(Debug)]
28pub struct LogState {
29 pub id: LogID,
30 pub header: Option<LogHeader>,
31 pub entries: Vec<litl::Val>,
32 pub last_hash: Option<Signed<StrongHash>>,
33 hasher: StrongHasher,
34}
35
36impl LogState {
37 pub(crate) fn new_empty(id: LogID) -> Self {
38 Self {
39 id,
40 header: None,
41 entries: Vec::new(),
42 last_hash: None,
43 hasher: StrongHasher::default(),
44 }
45 }
46
47 pub(crate) fn new<M: Serialize>(meta: Option<M>) -> (Self, LogWriteAccess) {
48 let signer_secret = SignerSecret::new_random();
49
50 let header = LogHeader {
51 appender: signer_secret.pub_id(),
52 meta: meta.map(|meta| litl::to_val(&meta).unwrap()),
53 };
54
55 let id = LogID(HashOf::hash(&header));
56
57 (
58 Self {
59 id,
60 header: Some(header),
61 entries: Vec::new(),
62 last_hash: None,
63 hasher: StrongHasher::default(),
64 },
65 LogWriteAccess(signer_secret),
66 )
67 }
68
69 pub(crate) fn diff_for_new_entry(
70 &self,
71 entry: litl::Val,
72 write_access: &LogWriteAccess,
73 ) -> LogDiff {
74 let mut hasher = self.hasher.clone();
75 hasher.update(&litl::to_vec_canonical(&entry).unwrap());
76
77 LogDiff {
78 id: self.id,
79 header: None,
80 after: self.entries.len(),
81 new_entries: vec![entry],
82 new_hash: Some(write_access.0.sign(hasher.finalize())),
83 }
84 }
85}
86
87#[derive(Copy, Clone, PartialEq, Eq, Hash)]
88pub struct LogID(pub HashOf<LogHeader>);
89
90impl NestedTaggedData for LogID {
91 const TAG: &'static str = "log";
92
93 type Inner = HashOf<LogHeader>;
94
95 fn as_inner(&self) -> &Self::Inner {
96 &self.0
97 }
98
99 fn from_inner(inner: Self::Inner) -> Self
100 where
101 Self: Sized,
102 {
103 LogID(inner)
104 }
105}
106
107impl_nested_tagged_data_serde!(LogID);
108impl_debug_as_litl!(LogID);
109
110pub struct LogWriteAccess(pub(crate) SignerSecret);
111
112impl NestedTaggedData for LogWriteAccess {
113 const TAG: &'static str = "logWriteAccess";
114
115 type Inner = SignerSecret;
116
117 fn as_inner(&self) -> &Self::Inner {
118 &self.0
119 }
120
121 fn from_inner(inner: Self::Inner) -> Self
122 where
123 Self: Sized,
124 {
125 LogWriteAccess(inner)
126 }
127}
128
129impl_nested_tagged_data_serde!(LogWriteAccess);
130impl_debug_as_litl!(LogWriteAccess);
131
132#[derive(Clone, Serialize, Deserialize, Debug)]
133pub struct LogDiff {
134 pub id: LogID,
135 pub header: Option<LogHeader>,
136 pub after: usize,
137 pub new_entries: Vec<litl::Val>,
138 pub new_hash: Option<Signed<StrongHash>>,
139}
140
141impl TelepathicDiff for LogDiff {
142 type ID = LogID;
143
144 fn id(&self) -> Self::ID {
145 self.id
146 }
147}
148
149pub type LogStateInfo = usize;
150
151#[derive(Serialize, Deserialize)]
152pub struct StorageHashAndLen {
153 pub hash: Signed<StrongHash>,
154 pub len: usize,
155}
156
157impl Telepathic for LogState {
158 type ID = LogID;
159 type WriteAccess = LogWriteAccess;
160 type StateInfo = LogStateInfo;
161 type Diff = LogDiff;
162 type Error = LogError;
163
164 fn id(&self) -> Self::ID {
165 self.id
166 }
167
168 fn try_apply_diff(
169 &mut self,
170 diff: LogDiff,
171 ) -> ApplyDiffResult<Self::StateInfo, Self::ID, Self::Diff, Self::Error> {
172 debug_assert_eq!(diff.id, self.id);
173
174 let (header, got_header_first_time) = match (&mut self.header, &diff.header) {
175 (None, None) => {
176 return Ok(None)
177 }
178 (own @ None, Some(diff_header)) => {
179 if HashOf::hash(diff_header) != self.id.0 {
180 return Err(LogError::InvalidHeaderHash.into());
181 }
182 *own = Some(diff_header.clone());
183 (own.as_ref().unwrap(), true)
184 }
185 (Some(own_header), _) => (&*own_header, false),
186 };
187
188 if diff.after == 0 && diff.new_entries.is_empty() {
189 if self.entries.is_empty() {
190 return Ok(Some(ApplyDiffSuccess {
191 new_state_info: 0,
192 effective_diff: LogDiff {
193 id: diff.id,
194 header: if got_header_first_time {
195 self.header.clone()
196 } else {
197 None
198 },
199 after: 0,
200 new_entries: vec![],
201 new_hash: None,
202 },
203 }));
204 } else {
205 return Err(ApplyDiffErrorFor::InvalidKnownStateAssumption(
206 ObjectID::Log(self.id),
207 "Got initial diff for non-empty log".to_owned(),
208 ));
209 }
210 }
211
212 let new_hash = diff.new_hash.as_ref().expect("Expected new hash on append");
213 new_hash
214 .ensure_signed_by(&header.appender)
215 .map_err(LogError::InvalidSignature)?;
216
217 if diff.after > self.entries.len() {
218 return Err(ApplyDiffErrorFor::InvalidKnownStateAssumption(
219 ObjectID::Log(self.id),
220 "Got diff later than current log length".to_owned(),
221 ));
222 }
223
224 if diff.after + diff.new_entries.len() <= self.entries.len() {
225 info!(
226 after = diff.after,
227 new_entries = diff.new_entries.len(),
228 self_len = self.entries.len(),
229 "Received completely redundant log update"
230 );
231 return Ok(None)
232 }
233
234 let overlap = self.entries.len() - diff.after;
235 let data_len_before = self.entries.len();
236 let effective_new_entries = &diff.new_entries[overlap..];
237
238 let mut new_hasher = self.hasher.clone();
239 for entry in &effective_new_entries[..] {
240 new_hasher.update(&litl::to_vec_canonical(&entry).unwrap());
241 }
242
243 if new_hasher.finalize() == new_hash.attested {
244 self.entries.extend(effective_new_entries.to_vec());
245 self.last_hash = Some(new_hash.clone());
246 self.hasher = new_hasher;
247
248 let effective_diff = if overlap > 0 {
249 info!(
250 overlap = overlap,
251 after = diff.after,
252 self_len = data_len_before,
253 new_entries = diff.new_entries.len(),
254 effective_new_entries = effective_new_entries.len(),
255 "Received redundant log update"
256 );
257
258 LogDiff {
259 id: diff.id,
260 header: if got_header_first_time {
261 self.header.clone()
262 } else {
263 None
264 },
265 after: data_len_before,
266 new_entries: effective_new_entries.to_vec(),
267 new_hash: diff.new_hash,
268 }
269 } else {
270 LogDiff {
271 header: if got_header_first_time {
272 self.header.clone()
273 } else {
274 None
275 },
276 ..diff
277 }
278 };
279
280 Ok(Some(ApplyDiffSuccess {
281 new_state_info: self.entries.len(),
282 effective_diff,
283 }))
284 } else {
285 Err(ApplyDiffErrorFor::Other(LogError::InvalidHash))
286 }
287 }
288
289 fn diff_since(&self, state_info: Option<&Self::StateInfo>) -> Option<Self::Diff> {
290 let (has_header, state_len) = match state_info {
291 None => (false, 0),
292 Some(len) => (true, *len as usize),
293 };
294
295 if state_len >= self.entries.len() && has_header {
296 None
297 } else {
298 Some(LogDiff {
299 id: self.id,
300 header: if has_header {
301 None
302 } else {
303 self.header.clone()
304 },
305 after: state_len,
306 new_entries: self.entries[state_len..].to_vec(),
307 new_hash: self.last_hash.clone(),
308 })
309 }
310 }
311
312 fn state_info(&self) -> Option<Self::StateInfo> {
313 if self.header.is_none() {
314 None
315 } else {
316 Some(self.entries.len())
317 }
318 }
319
320 fn load(
321 id: LogID,
322 storage: Box<dyn StorageBackend>,
323 ) -> std::pin::Pin<Box<dyn futures::Stream<Item = Self::Diff>>> {
324 let key = id.to_string();
325
326 let header_stream = {
327 let key = key.clone();
328 stream::once(storage.get_key(&format!("header_{}", key))).filter_map(
329 move |maybe_bytes| {
330 future::ready(maybe_bytes.and_then(|bytes| {
331 litl::from_slice(&bytes)
332 .map_err(|err| {
333 error!(err = ?err, id = ?id, "Failed to read loaded log header");
334 err
335 })
336 .ok()
337 .map(|header| LogDiff {
338 id,
339 header: Some(header),
340 after: 0,
341 new_entries: vec![],
342 new_hash: None,
343 })
344 }))
345 },
346 )
347 };
348
349 let all_chunks_stream = {
350 stream::once(storage.get_key(&format!("hash_and_len_{}", key)))
351 .filter_map(move |maybe_hash_and_len_bytes| {
352 future::ready(maybe_hash_and_len_bytes.and_then(|hash_and_len_bytes| {
353 litl::from_slice::<StorageHashAndLen>(&hash_and_len_bytes).map_err(|err| {
354 error!(err = ?err, id = ?id, "Failed to read loaded log hash and len");
355 err
356 }).ok()
357 }))
358 })
359 .filter_map(move |hash_and_len| {
360 let storage = storage.clone_ref();
361 let key = key.clone();
362 async move {
363 let all_items = storage
364 .get_stream(&key)
365 .map(|data|litl::from_slice::<litl::Val>(&data).unwrap())
366 .collect::<Vec<_>>()
367 .await
368 ;
369
370 if all_items.len() < hash_and_len.len {
371 error!(id = ?id, "Storage backend returned less data than last hash");
372 None
373 } else {
374 Some(LogDiff {
375 id,
376 header: None,
377 after: 0,
378 new_entries: all_items[0..hash_and_len.len].to_vec(),
379 new_hash: Some(hash_and_len.hash),
380 })
381 }
382 }
383 })
384 };
385
386 header_stream.chain(all_chunks_stream).boxed_local()
387 }
388
389 fn store(
390 effective_diff: LogDiff,
391 storage: Box<dyn StorageBackend>,
392 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()>>> {
393 let key = effective_diff.id.to_string();
394 async move {
395 if let Some(header) = &effective_diff.header {
396 storage
397 .set_key(&format!("header_{}", key), litl::to_vec(header).unwrap())
398 .await;
399 }
400
401 if let Some(new_hash) = &effective_diff.new_hash {
402 if effective_diff.new_entries.is_empty() {
403 return;
404 }
405 let new_len = effective_diff.after + effective_diff.new_entries.len();
406 for entry in effective_diff.new_entries {
407 storage
408 .append_to_stream(&key, litl::to_vec(&entry).unwrap(), None)
409 .await;
410 }
411
412 storage
413 .set_key(
414 &format!("hash_and_len_{}", key),
415 litl::to_vec(&StorageHashAndLen {
416 hash: new_hash.clone(),
417 len: new_len,
418 })
419 .unwrap(),
420 )
421 .await;
422 }
423 }
424 .boxed_local()
425 }
426}
427
428#[derive(Error, Debug)]
429pub enum LogError {
430 #[error("Invalid header hash")]
431 InvalidHeaderHash,
432 #[error("Invalid hash after append")]
433 InvalidHash,
434 #[error(transparent)]
435 InvalidSignature(#[from] SignatureError),
436 #[error("Append out of order")]
437 AppendOutOfOrder,
438}