use futures::FutureExt;
use holochain_keystore::{AgentPubKeyExt, MetaLairClient};
use holochain_nonce::Nonce256Bits;
use holochain_serialized_bytes::SerializedBytesError;
use holochain_types::chain::ChainItem;
use holochain_types::prelude::*;
use must_future::MustBoxFuture;
use std::{fmt::Debug, sync::Arc};
pub mod chc_local;
#[cfg(feature = "http")]
pub mod chc_http;
#[async_trait::async_trait]
pub trait ChainHeadCoordinator {
type Item: ChainItem;
async fn add_records_request(&self, request: AddRecordsRequest) -> ChcResult<()>;
async fn get_record_data_request(
&self,
request: GetRecordsRequest,
) -> ChcResult<Vec<(SignedActionHashed, Option<(Arc<EncryptedEntry>, Signature)>)>>;
}
pub trait ChainHeadCoordinatorExt:
'static + Send + Sync + ChainHeadCoordinator<Item = SignedActionHashed>
{
fn signing_info(&self) -> (MetaLairClient, AgentPubKey);
fn add_records(self: Arc<Self>, records: Vec<Record>) -> MustBoxFuture<'static, ChcResult<()>> {
let (keystore, agent) = self.signing_info();
async move {
let payload = AddRecordPayload::from_records(keystore, agent, records).await?;
self.add_records_request(payload).await
}
.boxed()
.into()
}
fn get_record_data(
self: Arc<Self>,
since_hash: Option<ActionHash>,
) -> MustBoxFuture<'static, ChcResult<Vec<Record>>> {
let (keystore, agent) = self.signing_info();
async move {
let mut bytes = [0; 32];
getrandom::fill(&mut bytes).map_err(|e| ChcError::Other(e.to_string()))?;
let nonce = Nonce256Bits::from(bytes);
let payload = GetRecordsPayload { since_hash, nonce };
let signature = agent.sign(&keystore, &payload).await?;
self.get_record_data_request(GetRecordsRequest { payload, signature })
.await?
.into_iter()
.map(|(a, me)| {
Ok(Record::new(
a,
me.map(|(e, _s)| holochain_serialized_bytes::decode(&e.0))
.transpose()?,
))
})
.collect()
}
.boxed()
.into()
}
#[cfg(feature = "test_utils")]
fn head(self: Arc<Self>) -> MustBoxFuture<'static, ChcResult<Option<ActionHash>>> {
async move {
Ok(self
.get_record_data(None)
.await?
.pop()
.map(|r| r.action_address().clone()))
}
.boxed()
.into()
}
}
pub type ChcImpl = Arc<dyn 'static + Send + Sync + ChainHeadCoordinatorExt>;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct AddRecordPayload {
#[serde(with = "serde_bytes")]
pub signed_action_msgpack: Vec<u8>,
pub signed_action_signature: Signature,
pub encrypted_entry: Option<(Arc<EncryptedEntry>, Signature)>,
}
impl AddRecordPayload {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(keystore, records)))]
pub async fn from_records(
keystore: MetaLairClient,
agent_pubkey: AgentPubKey,
records: Vec<Record>,
) -> ChcResult<Vec<Self>> {
futures::future::join_all(records.into_iter().map(
|Record {
signed_action,
entry,
}| {
let keystore = keystore.clone();
let agent_pubkey = agent_pubkey.clone();
async move {
let encrypted_entry_bytes = entry
.into_option()
.map(|entry| {
let entry = holochain_serialized_bytes::encode(&entry)?;
tracing::warn!(
"CHC is using unencrypted entry data. TODO: add encryption"
);
ChcResult::Ok(entry)
})
.transpose()?;
let encrypted_entry = if let Some(bytes) = encrypted_entry_bytes {
let signature = keystore
.sign(agent_pubkey.clone(), bytes.clone().into())
.await?;
Some((Arc::new(bytes.into()), signature))
} else {
None
};
let signed_action_msgpack = holochain_serialized_bytes::encode(&signed_action)?;
let author = signed_action.action().author();
let signed_action_signature = author
.sign_raw(&keystore, signed_action_msgpack.clone().into())
.await?;
assert!(author
.verify_signature_raw(
&signed_action_signature,
signed_action_msgpack.clone().into()
)
.await
.unwrap());
ChcResult::Ok(AddRecordPayload {
signed_action_msgpack,
signed_action_signature,
encrypted_entry,
})
}
},
))
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
}
}
pub type AddRecordsRequest = Vec<AddRecordPayload>;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct GetRecordsPayload {
pub since_hash: Option<ActionHash>,
pub nonce: Nonce256Bits,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct GetRecordsRequest {
pub payload: GetRecordsPayload,
pub signature: Signature,
}
#[derive(Debug, serde::Serialize, serde::Deserialize, derive_more::From)]
pub struct EncryptedEntry(#[serde(with = "serde_bytes")] pub Vec<u8>);
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum ChcError {
#[error(transparent)]
SerializationError(#[from] SerializedBytesError),
#[error(transparent)]
JsonSerializationError(#[from] serde_json::Error),
#[error(transparent)]
LairError(#[from] one_err::OneErr),
#[error("Local chain is out of sync with the CHC. The CHC head has advanced beyond the first action provided in the `add_records` request. Try calling `get_record_data` from hash {1} (sequence #{0}).")]
InvalidChain(u32, ActionHash),
#[error("Invalid `add_records` payload. Seq number: {0}")]
NoRecordsAdded(u32),
#[error("Missing Entry for ActionHash: {0}")]
MissingEntryForAction(ActionHash),
#[error("The CHC service is unreachable: {0}")]
ServiceUnreachable(String),
#[error("Unexpected error: {0}")]
Other(String),
}
#[allow(missing_docs)]
pub type ChcResult<T> = Result<T, ChcError>;