use std::{collections::HashMap, fmt::Debug, sync::Arc};
use futures::FutureExt;
use holo_hash::{ActionHash, AgentPubKey, EntryHash};
use holochain_keystore::MetaLairClient;
use holochain_serialized_bytes::SerializedBytesError;
use holochain_zome_types::prelude::*;
use must_future::MustBoxFuture;
use crate::chain::ChainItem;
#[async_trait::async_trait]
pub trait ChainHeadCoordinator {
type Item: ChainItem;
async fn add_records_request(&self, request: AddRecordsRequest) -> ChcResult<()>;
async fn get_record_data_request(
&self,
request: GetRecordsRequest,
) -> ChcResult<Vec<(SignedActionHashed, Option<(Arc<EncryptedEntry>, Signature)>)>>;
}
pub trait ChainHeadCoordinatorExt:
'static + Send + Sync + ChainHeadCoordinator<Item = SignedActionHashed>
{
fn signing_info(&self) -> (MetaLairClient, AgentPubKey);
fn add_records(self: Arc<Self>, records: Vec<Record>) -> MustBoxFuture<'static, ChcResult<()>> {
let (keystore, agent) = self.signing_info();
async move {
let payload = AddRecordPayload::from_records(keystore, agent, records).await?;
serde_json::to_string(&payload).unwrap();
self.add_records_request(payload).await
}
.boxed()
.into()
}
fn get_record_data(
self: Arc<Self>,
since_hash: Option<ActionHash>,
) -> MustBoxFuture<'static, ChcResult<Vec<Record>>> {
let mut bytes = [0; 32];
let _ = getrandom::getrandom(&mut bytes);
let nonce = Nonce256Bits::from(bytes);
let payload = GetRecordsPayload { since_hash, nonce };
let signature = Signature::from([0; 64]);
async move {
self.get_record_data_request(GetRecordsRequest { payload, signature })
.await?
.into_iter()
.map(|(a, me)| {
Ok(Record::new(
a,
me.map(|(e, _s)| holochain_serialized_bytes::decode(&e.0))
.transpose()?,
))
})
.collect()
}
.boxed()
.into()
}
fn head(self: Arc<Self>) -> MustBoxFuture<'static, ChcResult<Option<ActionHash>>> {
async move {
Ok(self
.get_record_data(None)
.await?
.pop()
.map(|r| r.action_address().clone()))
}
.boxed()
.into()
}
}
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct AddRecordPayload<A = SignedActionHashed> {
pub action: A,
pub encrypted_entry: Option<(Arc<EncryptedEntry>, Signature)>,
}
impl AddRecordPayload {
pub async fn from_records(
keystore: MetaLairClient,
agent_pubkey: AgentPubKey,
records: Vec<Record>,
) -> ChcResult<Vec<Self>> {
futures::future::join_all(records.into_iter().map(
|Record {
signed_action,
entry,
}| {
let keystore = keystore.clone();
let agent_pubkey = agent_pubkey.clone();
async move {
let action = signed_action;
let encrypted_entry_bytes = entry
.into_option()
.map(|entry| {
let entry = holochain_serialized_bytes::encode(&entry)?;
tracing::warn!(
"CHC is using unencrypted entry data. TODO: add encryption"
);
ChcResult::Ok(entry)
})
.transpose()?;
let encrypted_entry = if let Some(bytes) = encrypted_entry_bytes {
let signature = keystore
.sign(agent_pubkey.clone(), bytes.clone().into())
.await?;
Some((Arc::new(bytes.into()), signature))
} else {
None
};
ChcResult::Ok(AddRecordPayload {
action,
encrypted_entry,
})
}
},
))
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
}
}
pub type AddRecordsRequest = Vec<AddRecordPayload>;
#[derive(serde::Serialize, serde::Deserialize)]
pub struct GetRecordsPayload {
pub since_hash: Option<ActionHash>,
pub nonce: Nonce256Bits,
}
#[derive(serde::Serialize, serde::Deserialize)]
pub struct GetRecordsRequest {
pub payload: GetRecordsPayload,
pub signature: Signature,
}
#[derive(serde::Serialize, serde::Deserialize, derive_more::From)]
pub struct EncryptedEntry(#[serde(with = "serde_bytes")] pub Vec<u8>);
pub fn records_from_actions_and_entries(
actions: Vec<SignedActionHashed>,
mut entries: HashMap<EntryHash, Entry>,
) -> ChcResult<Vec<Record>> {
let mut records = vec![];
for action in actions {
let entry = if let Some(hash) = action.hashed.entry_hash() {
Some(
entries
.remove(hash)
.ok_or_else(|| ChcError::MissingEntryForAction(action.as_hash().clone()))?,
)
} else {
None
};
let record = Record::new(action, entry);
records.push(record);
}
Ok(records)
}
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum ChcError {
#[error(transparent)]
SerializationError(#[from] SerializedBytesError),
#[error(transparent)]
JsonSerializationError(#[from] serde_json::Error),
#[error(transparent)]
LairError(#[from] one_err::OneErr),
#[error("Local chain is out of sync with the CHC. The CHC head has advanced beyond the first action provided in the `add_records` request. Try calling `get_record_data` from hash {1} (sequence #{0}).")]
InvalidChain(u32, ActionHash),
#[error("Invalid `add_records` payload. Reason: {0}")]
NoRecordsAdded(String),
#[error("Missing Entry for ActionHash: {0}")]
MissingEntryForAction(ActionHash),
#[error("The CHC service is unreachable: {0}")]
ServiceUnreachable(String),
#[error("Unexpected error: {0}")]
Other(String),
}
#[allow(missing_docs)]
pub type ChcResult<T> = Result<T, ChcError>;