use std::{collections::HashMap, fmt::Debug};
use audi::{Listeners, Listener};
use conundrum::{
hashing::{RawHash, RawHasher},
purpose,
signing::{SignPublicKey, Signed, SignatureError, SignKeyPair},
};
use litl::{impl_debug_as_litl};
use serde_derive::{Serialize, Deserialize};
use crate::{UpdateSource, KEEP_UNKNOWN};
purpose!(LogAppend);
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename = "Telepathy/LogID")]
pub struct LogID(SignPublicKey<LogAppend>);
impl_debug_as_litl!(LogID);
pub struct LogWriteAccess(SignKeyPair<LogAppend>);
impl LogWriteAccess {
pub fn id(&self) -> LogID {
LogID(self.0.public())
}
}
#[derive(Default)]
pub struct Logs {
logs: HashMap<LogID, LogState>,
}
#[derive(Default)]
struct LogState {
priority: u8,
data: Vec<u8>,
last_hash: Option<Signed<RawHash, LogAppend>>,
hasher: RawHasher,
listeners: Listeners<(LogAppendMessage, UpdateSource)>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct LogAppendMessage {
pub log_id: LogID,
pub after: usize,
#[serde(with="serde_bytes")]
pub append: Vec<u8>,
pub new_hash: Signed<RawHash, LogAppend>,
}
impl_debug_as_litl!(LogAppendMessage);
#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)]
pub struct LogKnownState {
pub log_len: usize,
pub priority: u8,
}
impl LogKnownState {
pub fn update_optimistically(&mut self, append: LogAppendMessage) {
if append.after == self.log_len {
self.log_len += append.append.len();
}
}
}
use thiserror::Error;
#[derive(Error, Debug)]
pub enum LogError {
#[error("Invalid hash after append")]
InvalidHash,
#[error(transparent)]
InvalidSignature(#[from] SignatureError)
}
impl Logs {
pub fn create_log(&mut self) -> LogWriteAccess {
let keypair = SignKeyPair::new_random();
match self.logs.entry(LogID(keypair.public())) {
std::collections::hash_map::Entry::Occupied(_) => unreachable!("Should never have existing log for new keypair"),
std::collections::hash_map::Entry::Vacant(entry) => entry.insert(LogState::default()),
};
LogWriteAccess(keypair)
}
pub fn append(&mut self, access: &LogWriteAccess, data: &[u8]) -> Result<(), LogError> {
let log_state = self.logs.get(&access.id()).expect("Log should be created or loaded when appending");
let mut new_hasher = log_state.hasher.clone();
new_hasher.update(data);
let current_len = log_state.data.len();
self.accept_append(&LogAppendMessage {
log_id: access.id(),
after: current_len,
append: data.to_vec(),
new_hash: access.0.sign(new_hasher.finalize())
}, UpdateSource::CreatedLocally)
}
pub fn current_data(&self, log_id: &LogID) -> Option<&[u8]> {
self.logs.get(log_id).map(|log_state| log_state.data.as_slice())
}
pub(crate) fn accept_append(
&mut self,
msg: &LogAppendMessage,
source: UpdateSource,
) -> Result<(), LogError> {
msg.new_hash.ensure_signed_by(&msg.log_id.0)?;
let log = if KEEP_UNKNOWN {
self.logs.entry(msg.log_id).or_default()
} else if let Some(log) = self.logs.get_mut(&msg.log_id) {
log
} else {
return Ok(());
};
let mut new_hasher = log.hasher.clone();
new_hasher.update(&msg.append);
if new_hasher.finalize() == msg.new_hash.verified {
log.data.extend(&msg.append);
log.last_hash = Some(msg.new_hash.clone());
log.hasher = new_hasher;
log.listeners.emit((msg.clone(), source));
Ok(())
} else {
Err(LogError::InvalidHash)
}
}
pub(crate) fn all_log_ids(&self) -> impl Iterator<Item = &LogID> {
self.logs.keys()
}
pub(crate) fn known_state(&self, log_id: &LogID) -> Option<LogKnownState> {
self.logs.get(log_id).map(|log_state| LogKnownState {
log_len: log_state.data.len(),
priority: log_state.priority,
})
}
pub(crate) fn get_append_since(
&self,
log_id: &LogID,
known_state: Option<&LogKnownState>,
) -> Option<LogAppendMessage> {
let known_len = known_state.map(|state| state.log_len).unwrap_or(0);
self.logs.get(log_id).and_then(|log| {
if log.data.len() > known_len {
Some(LogAppendMessage {
log_id: *log_id,
after: known_len,
append: log.data[known_len..].to_vec(),
new_hash: log
.last_hash
.as_ref()
.expect("Should have last_hash with data")
.clone(),
})
} else {
None
}
})
}
pub fn add_listener(
&mut self,
log_id: LogID,
listener: Box<dyn Listener<(LogAppendMessage, UpdateSource)>>,
) {
let log = self.logs.entry(log_id).or_default();
log.listeners.add_with_initial_msg(
listener,
if !log.data.is_empty() {
Some((
LogAppendMessage {
append: log.data.clone(),
log_id,
after: 0,
new_hash: log
.last_hash
.as_ref()
.expect("Should have last_hash with data")
.clone(),
},
UpdateSource::CurrentState,
))
} else {
None
},
)
}
}