1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
//! Defines the Chain Head Coordination API.
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use futures::FutureExt;
use holo_hash::{ActionHash, AgentPubKey, EntryHash};
use holochain_keystore::{AgentPubKeyExt, MetaLairClient};
use holochain_serialized_bytes::SerializedBytesError;
use holochain_zome_types::prelude::*;
use must_future::MustBoxFuture;
use crate::chain::ChainItem;
/// The API which a Chain Head Coordinator service must implement.
#[async_trait::async_trait]
pub trait ChainHeadCoordinator {
/// The item which the chain is made of.
type Item: ChainItem;
/// Request that the CHC append these records to its chain.
///
/// Whenever Holochain is about to commit something, this function will first be called.
/// The CHC will do some integrity checks, which may fail.
/// All signatures and hashes need to line up properly.
/// If the records added would result in a fork, then a [`ChcError::OutOfSync`] will be returned
/// along with the current
// If there is an out-of-sync error, it will return a hash, designating the point of fork.
async fn add_records_request(&self, request: AddRecordsRequest) -> ChcResult<()>;
/// Get actions after (not including) the given hash.
async fn get_record_data_request(
&self,
request: GetRecordsRequest,
) -> ChcResult<Vec<(SignedActionHashed, Option<(Arc<EncryptedEntry>, Signature)>)>>;
}
/// Add some convenience methods to the CHC trait
pub trait ChainHeadCoordinatorExt:
'static + Send + Sync + ChainHeadCoordinator<Item = SignedActionHashed>
{
/// Get info necessary for signing
fn signing_info(&self) -> (MetaLairClient, AgentPubKey);
/// More convenient way to call `add_records_request`
fn add_records(self: Arc<Self>, records: Vec<Record>) -> MustBoxFuture<'static, ChcResult<()>> {
let (keystore, agent) = self.signing_info();
async move {
let payload = AddRecordPayload::from_records(keystore, agent, records).await?;
self.add_records_request(payload).await
}
.boxed()
.into()
}
/// More convenient way to call `get_record_data_request`
fn get_record_data(
self: Arc<Self>,
since_hash: Option<ActionHash>,
) -> MustBoxFuture<'static, ChcResult<Vec<Record>>> {
let (keystore, agent) = self.signing_info();
async move {
let mut bytes = [0; 32];
getrandom::getrandom(&mut bytes).map_err(|e| ChcError::Other(e.to_string()))?;
let nonce = Nonce256Bits::from(bytes);
let payload = GetRecordsPayload { since_hash, nonce };
let signature = agent.sign(&keystore, &payload).await?;
self.get_record_data_request(GetRecordsRequest { payload, signature })
.await?
.into_iter()
.map(|(a, me)| {
Ok(Record::new(
a,
me.map(|(e, _s)| holochain_serialized_bytes::decode(&e.0))
.transpose()?,
))
})
.collect()
}
.boxed()
.into()
}
/// Just a convenience for testing. Should not be used otherwise.
#[cfg(feature = "test_utils")]
fn head(self: Arc<Self>) -> MustBoxFuture<'static, ChcResult<Option<ActionHash>>> {
async move {
Ok(self
.get_record_data(None)
.await?
.pop()
.map(|r| r.action_address().clone()))
}
.boxed()
.into()
}
}
/// A Record to be added to the CHC.
///
/// The SignedActionHashed is constructed as usual.
/// The Entry data is encrypted (TODO: by which key?), and the encrypted data
/// is signed by the agent. This ensures that only the correct agent is adding
/// records to its CHC. This EncryptedEntry signature is not used anywhere
/// outside the context of the CHC.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct AddRecordPayload {
/// The msgpack-encoded SignedActionHashed for the Record. This is encoded as such because the CHC
/// needs to verify the signature, and these are the exact bytes which are signed, so
/// this removes the need to deserialize and then re-serialize.
///
/// This must be deserialized as `SignedActionHashed`.
#[serde(with = "serde_bytes")]
pub signed_action_msgpack: Vec<u8>,
/// The signature of the SignedActionHashed
/// (NOTE: usually signatures are of just the Action, but in this case we want to
/// include the entire struct in the signature so we don't have to recalculate that on the CHC)
pub signed_action_signature: Signature,
/// The entry, encrypted (TODO: by which key?), with the signature of
/// of the encrypted bytes
pub encrypted_entry: Option<(Arc<EncryptedEntry>, Signature)>,
}
impl AddRecordPayload {
/// Create a payload from a list of records.
/// This performs the necessary signing and encryption the CHC requires.
pub async fn from_records(
keystore: MetaLairClient,
agent_pubkey: AgentPubKey,
records: Vec<Record>,
) -> ChcResult<Vec<Self>> {
futures::future::join_all(records.into_iter().map(
|Record {
signed_action,
entry,
}| {
let keystore = keystore.clone();
let agent_pubkey = agent_pubkey.clone();
async move {
let encrypted_entry_bytes = entry
.into_option()
.map(|entry| {
let entry = holochain_serialized_bytes::encode(&entry)?;
tracing::warn!(
"CHC is using unencrypted entry data. TODO: add encryption"
);
ChcResult::Ok(entry)
})
.transpose()?;
let encrypted_entry = if let Some(bytes) = encrypted_entry_bytes {
let signature = keystore
.sign(agent_pubkey.clone(), bytes.clone().into())
.await?;
Some((Arc::new(bytes.into()), signature))
} else {
None
};
let signed_action_msgpack = holochain_serialized_bytes::encode(&signed_action)?;
let author = signed_action.action().author();
let signed_action_signature = author
.sign_raw(&keystore, signed_action_msgpack.clone().into())
.await?;
assert!(
author
.verify_signature_raw(
&signed_action_signature,
signed_action_msgpack.clone().into()
)
.await
);
ChcResult::Ok(AddRecordPayload {
signed_action_msgpack,
signed_action_signature,
encrypted_entry,
})
}
},
))
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
}
}
/// The request type for `add_records`
pub type AddRecordsRequest = Vec<AddRecordPayload>;
/// The request to retrieve records from the CHC.
///
/// If a `since_hash` is specified, all records with sequence numbers at and
/// above the one at the given hash will be returned. If no `since_hash` is
/// given, then all records will be returned.
///
/// Since this payload is signed, including a unique nonce helps prevent replay
/// attacks.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct GetRecordsPayload {
/// Only records beyond and including this hash are returned
pub since_hash: Option<ActionHash>,
/// Randomly selected nonce to prevent replay attacks
pub nonce: Nonce256Bits,
}
/// The full request for get_record_data
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct GetRecordsRequest {
/// The payload
pub payload: GetRecordsPayload,
/// The signature of the payload
pub signature: Signature,
}
/// Encrypted bytes of an Entry
#[derive(Debug, serde::Serialize, serde::Deserialize, derive_more::From)]
pub struct EncryptedEntry(#[serde(with = "serde_bytes")] pub Vec<u8>);
/// Assemble records from a list of Actions and a map of Entries
pub fn records_from_actions_and_entries(
actions: Vec<SignedActionHashed>,
mut entries: HashMap<EntryHash, Entry>,
) -> ChcResult<Vec<Record>> {
let mut records = vec![];
for action in actions {
let entry = if let Some(hash) = action.hashed.entry_hash() {
Some(
entries
.remove(hash)
.ok_or_else(|| ChcError::MissingEntryForAction(action.as_hash().clone()))?,
)
} else {
None
};
let record = Record::new(action, entry);
records.push(record);
}
Ok(records)
}
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum ChcError {
#[error(transparent)]
SerializationError(#[from] SerializedBytesError),
#[error(transparent)]
JsonSerializationError(#[from] serde_json::Error),
#[error(transparent)]
LairError(#[from] one_err::OneErr),
/// The out of sync error only happens when you attempt to add actions
/// that would cause a fork with respect to the CHC. This can be remedied
/// by syncing.
#[error("Local chain is out of sync with the CHC. The CHC head has advanced beyond the first action provided in the `add_records` request. Try calling `get_record_data` from hash {1} (sequence #{0}).")]
InvalidChain(u32, ActionHash),
/// All other errors are due to an invalid request, which is a mistake
/// that can't be remedied other than by fixing the programming mistake
/// (which would be on the Holochain side)
/// Examples include:
/// - Vec<AddRecordPayload> must be sorted by `seq_number`
/// - There is a gap between the first action and the current CHC head
/// - The `Vec<AddRecordPayload>` does not constitute a valid chain (prev_action must be correct)
#[error("Invalid `add_records` payload. Seq number: {0}")]
NoRecordsAdded(u32),
/// An Action which has an entry was returned without the Entry
#[error("Missing Entry for ActionHash: {0}")]
MissingEntryForAction(ActionHash),
#[error("The CHC service is unreachable: {0}")]
ServiceUnreachable(String),
/// Unexpected error
#[error("Unexpected error: {0}")]
Other(String),
}
#[allow(missing_docs)]
pub type ChcResult<T> = Result<T, ChcError>;