use crate::prelude::{Signature, Timestamp};
use futures::{Stream, StreamExt, TryStreamExt};
use holo_hash::{AgentPubKey, DhtOpHash};
use holochain_keystore::{AgentPubKeyExt, MetaLairClient};
use holochain_serialized_bytes::prelude::*;
use holochain_zome_types::ValidationStatus;
use std::vec::IntoIter;
#[derive(
Debug,
Clone,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
serde::Serialize,
serde::Deserialize,
SerializedBytes,
)]
pub struct ValidationReceipt {
pub dht_op_hash: DhtOpHash,
pub validation_status: ValidationStatus,
pub validators: Vec<AgentPubKey>,
pub when_integrated: Timestamp,
}
impl ValidationReceipt {
pub async fn sign(
self,
keystore: &MetaLairClient,
) -> holochain_keystore::LairResult<Option<SignedValidationReceipt>> {
if self.validators.is_empty() {
return Ok(None);
}
let this = self.clone();
let futures = self
.validators
.iter()
.map(|validator| {
let this = this.clone();
let validator = validator.clone();
let keystore = keystore.clone();
async move { validator.sign(&keystore, this).await }
})
.collect::<Vec<_>>();
let stream = futures::stream::iter(futures);
let signatures = try_stream_of_results(stream).await?;
if signatures.is_empty() {
unreachable!("Signatures cannot be empty because the validators vec is not empty");
}
Ok(Some(SignedValidationReceipt {
receipt: self,
validators_signatures: signatures,
}))
}
}
async fn try_stream_of_results<T, U, E>(stream: U) -> Result<Vec<T>, E>
where
U: Stream,
<U as Stream>::Item: futures::Future<Output = Result<T, E>>,
{
stream.buffer_unordered(10).map(|r| r).try_collect().await
}
#[derive(
Debug,
Clone,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
serde::Serialize,
serde::Deserialize,
SerializedBytes,
)]
pub struct SignedValidationReceipt {
pub receipt: ValidationReceipt,
pub validators_signatures: Vec<Signature>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, SerializedBytes)]
pub struct ValidationReceiptBundle(Vec<SignedValidationReceipt>);
impl From<Vec<SignedValidationReceipt>> for ValidationReceiptBundle {
fn from(value: Vec<SignedValidationReceipt>) -> Self {
ValidationReceiptBundle(value)
}
}
impl IntoIterator for ValidationReceiptBundle {
type Item = SignedValidationReceipt;
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[cfg(test)]
pub mod tests {
use crate::validation_receipt::try_stream_of_results;
#[tokio::test]
async fn test_try_stream_of_results() {
let iter: Vec<futures::future::Ready<Result<i32, String>>> = vec![];
let stream = futures::stream::iter(iter);
assert_eq!(Ok(vec![]), try_stream_of_results(stream).await);
let iter = vec![async move { Result::<_, String>::Ok(0) }];
let stream = futures::stream::iter(iter);
assert_eq!(Ok(vec![0]), try_stream_of_results(stream).await);
let iter = (0..10).map(|i| async move { Result::<_, String>::Ok(i) });
let stream = futures::stream::iter(iter);
assert_eq!(
Ok((0..10).collect::<Vec<_>>()),
try_stream_of_results(stream).await
);
let iter = vec![async move { Result::<i32, String>::Err("test".to_string()) }];
let stream = futures::stream::iter(iter);
assert_eq!(Err("test".to_string()), try_stream_of_results(stream).await);
let iter = (0..10).map(|_| async move { Result::<i32, String>::Err("test".to_string()) });
let stream = futures::stream::iter(iter);
assert_eq!(Err("test".to_string()), try_stream_of_results(stream).await);
}
}