1use std::{collections::HashMap, fmt::Debug};
2
3use audi::{Listeners, Listener};
4use conundrum::{
5 hashing::{RawHash, RawHasher},
6 purpose,
7 signing::{SignPublicKey, Signed, SignatureError, SignKeyPair},
8};
9use litl::{impl_debug_as_litl};
10use serde_derive::{Serialize, Deserialize};
11
12use crate::{UpdateSource, KEEP_UNKNOWN};
13
14purpose!(LogAppend);
15
16#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
17#[serde(rename = "Telepathy/LogID")]
18pub struct LogID(SignPublicKey<LogAppend>);
19
20impl_debug_as_litl!(LogID);
21
22pub struct LogWriteAccess(SignKeyPair<LogAppend>);
23
24impl LogWriteAccess {
25 pub fn id(&self) -> LogID {
26 LogID(self.0.public())
27 }
28}
29
30#[derive(Default)]
31pub struct Logs {
32 logs: HashMap<LogID, LogState>,
33}
34
35#[derive(Default)]
36struct LogState {
37 priority: u8,
38 data: Vec<u8>,
39 last_hash: Option<Signed<RawHash, LogAppend>>,
40 hasher: RawHasher,
41 listeners: Listeners<(LogAppendMessage, UpdateSource)>,
42}
43
44#[derive(Clone, Serialize, Deserialize)]
45pub struct LogAppendMessage {
46 pub log_id: LogID,
47 pub after: usize,
48 #[serde(with="serde_bytes")]
49 pub append: Vec<u8>,
50 pub new_hash: Signed<RawHash, LogAppend>,
51}
52
53impl_debug_as_litl!(LogAppendMessage);
54
55#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)]
56pub struct LogKnownState {
57 pub log_len: usize,
58 pub priority: u8,
59}
60
61impl LogKnownState {
62 pub fn update_optimistically(&mut self, append: LogAppendMessage) {
63 if append.after == self.log_len {
64 self.log_len += append.append.len();
65 }
66 }
67}
68
69use thiserror::Error;
70
71#[derive(Error, Debug)]
72pub enum LogError {
73 #[error("Invalid hash after append")]
74 InvalidHash,
75 #[error(transparent)]
76 InvalidSignature(#[from] SignatureError)
77}
78
79impl Logs {
80 pub fn create_log(&mut self) -> LogWriteAccess {
81 let keypair = SignKeyPair::new_random();
82
83 match self.logs.entry(LogID(keypair.public())) {
84 std::collections::hash_map::Entry::Occupied(_) => unreachable!("Should never have existing log for new keypair"),
85 std::collections::hash_map::Entry::Vacant(entry) => entry.insert(LogState::default()),
86 };
87
88 LogWriteAccess(keypair)
89 }
90
91 pub fn append(&mut self, access: &LogWriteAccess, data: &[u8]) -> Result<(), LogError> {
93 let log_state = self.logs.get(&access.id()).expect("Log should be created or loaded when appending");
94
95 let mut new_hasher = log_state.hasher.clone();
96 new_hasher.update(data);
97
98 let current_len = log_state.data.len();
99
100 self.accept_append(&LogAppendMessage {
101 log_id: access.id(),
102 after: current_len,
103 append: data.to_vec(),
104 new_hash: access.0.sign(new_hasher.finalize())
105 }, UpdateSource::CreatedLocally)
106 }
107
108 pub fn current_data(&self, log_id: &LogID) -> Option<&[u8]> {
109 self.logs.get(log_id).map(|log_state| log_state.data.as_slice())
110 }
111
112 pub(crate) fn accept_append(
113 &mut self,
114 msg: &LogAppendMessage,
115 source: UpdateSource,
116 ) -> Result<(), LogError> {
117 msg.new_hash.ensure_signed_by(&msg.log_id.0)?;
118 let log = if KEEP_UNKNOWN {
119 self.logs.entry(msg.log_id).or_default()
120 } else if let Some(log) = self.logs.get_mut(&msg.log_id) {
121 log
122 } else {
123 return Ok(());
125 };
126 let mut new_hasher = log.hasher.clone();
127 new_hasher.update(&msg.append);
128 if new_hasher.finalize() == msg.new_hash.verified {
129 log.data.extend(&msg.append);
130 log.last_hash = Some(msg.new_hash.clone());
131 log.hasher = new_hasher;
132
133 log.listeners.emit((msg.clone(), source));
134
135 Ok(())
136 } else {
137 Err(LogError::InvalidHash)
138 }
139 }
140
141 pub(crate) fn all_log_ids(&self) -> impl Iterator<Item = &LogID> {
142 self.logs.keys()
143 }
144
145 pub(crate) fn known_state(&self, log_id: &LogID) -> Option<LogKnownState> {
146 self.logs.get(log_id).map(|log_state| LogKnownState {
147 log_len: log_state.data.len(),
148 priority: log_state.priority,
149 })
150 }
151
152 pub(crate) fn get_append_since(
153 &self,
154 log_id: &LogID,
155 known_state: Option<&LogKnownState>,
156 ) -> Option<LogAppendMessage> {
157 let known_len = known_state.map(|state| state.log_len).unwrap_or(0);
158 self.logs.get(log_id).and_then(|log| {
159 if log.data.len() > known_len {
160 Some(LogAppendMessage {
161 log_id: *log_id,
162 after: known_len,
163 append: log.data[known_len..].to_vec(),
164 new_hash: log
165 .last_hash
166 .as_ref()
167 .expect("Should have last_hash with data")
168 .clone(),
169 })
170 } else {
171 None
172 }
173 })
174 }
175
176 pub fn add_listener(
177 &mut self,
178 log_id: LogID,
179 listener: Box<dyn Listener<(LogAppendMessage, UpdateSource)>>,
180 ) {
181 let log = self.logs.entry(log_id).or_default();
182 log.listeners.add_with_initial_msg(
183 listener,
184 if !log.data.is_empty() {
185 Some((
186 LogAppendMessage {
187 append: log.data.clone(),
188 log_id,
189 after: 0,
190 new_hash: log
191 .last_hash
192 .as_ref()
193 .expect("Should have last_hash with data")
194 .clone(),
195 },
196 UpdateSource::CurrentState,
197 ))
198 } else {
199 None
200 },
201 )
202 }
203}