jazz_telepathy/
logs.rs

1use std::{collections::HashMap, fmt::Debug};
2
3use audi::{Listeners, Listener};
4use conundrum::{
5    hashing::{RawHash, RawHasher},
6    purpose,
7    signing::{SignPublicKey, Signed, SignatureError, SignKeyPair},
8};
9use litl::{impl_debug_as_litl};
10use serde_derive::{Serialize, Deserialize};
11
12use crate::{UpdateSource, KEEP_UNKNOWN};
13
14purpose!(LogAppend);
15
16#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
17#[serde(rename = "Telepathy/LogID")]
18pub struct LogID(SignPublicKey<LogAppend>);
19
20impl_debug_as_litl!(LogID);
21
22pub struct LogWriteAccess(SignKeyPair<LogAppend>);
23
24impl LogWriteAccess {
25    pub fn id(&self) -> LogID {
26        LogID(self.0.public())
27    }
28}
29
30#[derive(Default)]
31pub struct Logs {
32    logs: HashMap<LogID, LogState>,
33}
34
35#[derive(Default)]
36struct LogState {
37    priority: u8,
38    data: Vec<u8>,
39    last_hash: Option<Signed<RawHash, LogAppend>>,
40    hasher: RawHasher,
41    listeners: Listeners<(LogAppendMessage, UpdateSource)>,
42}
43
44#[derive(Clone, Serialize, Deserialize)]
45pub struct LogAppendMessage {
46    pub log_id: LogID,
47    pub after: usize,
48    #[serde(with="serde_bytes")]
49    pub append: Vec<u8>,
50    pub new_hash: Signed<RawHash, LogAppend>,
51}
52
53impl_debug_as_litl!(LogAppendMessage);
54
55#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)]
56pub struct LogKnownState {
57    pub log_len: usize,
58    pub priority: u8,
59}
60
61impl LogKnownState {
62    pub fn update_optimistically(&mut self, append: LogAppendMessage) {
63        if append.after == self.log_len {
64            self.log_len += append.append.len();
65        }
66    }
67}
68
69use thiserror::Error;
70
71#[derive(Error, Debug)]
72pub enum LogError {
73    #[error("Invalid hash after append")]
74    InvalidHash,
75    #[error(transparent)]
76    InvalidSignature(#[from] SignatureError)
77}
78
79impl Logs {
80    pub fn create_log(&mut self) -> LogWriteAccess {
81        let keypair = SignKeyPair::new_random();
82
83        match self.logs.entry(LogID(keypair.public())) {
84            std::collections::hash_map::Entry::Occupied(_) => unreachable!("Should never have existing log for new keypair"),
85            std::collections::hash_map::Entry::Vacant(entry) => entry.insert(LogState::default()),
86        };
87
88        LogWriteAccess(keypair)
89    }
90
91    // TODO(correctness): Somehow make sure only one consumer can append
92    pub fn append(&mut self, access: &LogWriteAccess, data: &[u8]) -> Result<(), LogError> {
93        let log_state = self.logs.get(&access.id()).expect("Log should be created or loaded when appending");
94
95        let mut new_hasher = log_state.hasher.clone();
96        new_hasher.update(data);
97
98        let current_len = log_state.data.len();
99
100        self.accept_append(&LogAppendMessage {
101            log_id: access.id(),
102            after: current_len,
103            append: data.to_vec(),
104            new_hash: access.0.sign(new_hasher.finalize())
105        }, UpdateSource::CreatedLocally)
106    }
107
108    pub fn current_data(&self, log_id: &LogID) -> Option<&[u8]> {
109        self.logs.get(log_id).map(|log_state| log_state.data.as_slice())
110    }
111
112    pub(crate) fn accept_append(
113        &mut self,
114        msg: &LogAppendMessage,
115        source: UpdateSource,
116    ) -> Result<(), LogError> {
117        msg.new_hash.ensure_signed_by(&msg.log_id.0)?;
118        let log = if KEEP_UNKNOWN {
119            self.logs.entry(msg.log_id).or_default()
120        } else if let Some(log) = self.logs.get_mut(&msg.log_id) {
121            log
122        } else {
123            // ignore unknown log messages
124            return Ok(());
125        };
126        let mut new_hasher = log.hasher.clone();
127        new_hasher.update(&msg.append);
128        if new_hasher.finalize() == msg.new_hash.verified {
129            log.data.extend(&msg.append);
130            log.last_hash = Some(msg.new_hash.clone());
131            log.hasher = new_hasher;
132
133            log.listeners.emit((msg.clone(), source));
134
135            Ok(())
136        } else {
137            Err(LogError::InvalidHash)
138        }
139    }
140
141    pub(crate) fn all_log_ids(&self) -> impl Iterator<Item = &LogID> {
142        self.logs.keys()
143    }
144
145    pub(crate) fn known_state(&self, log_id: &LogID) -> Option<LogKnownState> {
146        self.logs.get(log_id).map(|log_state| LogKnownState {
147            log_len: log_state.data.len(),
148            priority: log_state.priority,
149        })
150    }
151
152    pub(crate) fn get_append_since(
153        &self,
154        log_id: &LogID,
155        known_state: Option<&LogKnownState>,
156    ) -> Option<LogAppendMessage> {
157        let known_len = known_state.map(|state| state.log_len).unwrap_or(0);
158        self.logs.get(log_id).and_then(|log| {
159            if log.data.len() > known_len {
160                Some(LogAppendMessage {
161                    log_id: *log_id,
162                    after: known_len,
163                    append: log.data[known_len..].to_vec(),
164                    new_hash: log
165                        .last_hash
166                        .as_ref()
167                        .expect("Should have last_hash with data")
168                        .clone(),
169                })
170            } else {
171                None
172            }
173        })
174    }
175
176    pub fn add_listener(
177        &mut self,
178        log_id: LogID,
179        listener: Box<dyn Listener<(LogAppendMessage, UpdateSource)>>,
180    ) {
181        let log = self.logs.entry(log_id).or_default();
182        log.listeners.add_with_initial_msg(
183            listener,
184            if !log.data.is_empty() {
185                Some((
186                    LogAppendMessage {
187                        append: log.data.clone(),
188                        log_id,
189                        after: 0,
190                        new_hash: log
191                            .last_hash
192                            .as_ref()
193                            .expect("Should have last_hash with data")
194                            .clone(),
195                    },
196                    UpdateSource::CurrentState,
197                ))
198            } else {
199                None
200            },
201        )
202    }
203}