holochain_types/
validation_receipt.rs1use crate::prelude::{Signature, Timestamp};
4use futures::{Stream, StreamExt, TryStreamExt};
5use holo_hash::{AgentPubKey, DhtOpHash};
6use holochain_keystore::{AgentPubKeyExt, MetaLairClient};
7use holochain_serialized_bytes::prelude::*;
8use holochain_zome_types::prelude::*;
9use std::vec::IntoIter;
10
11#[derive(
13 Debug,
14 Clone,
15 PartialEq,
16 Eq,
17 PartialOrd,
18 Ord,
19 Hash,
20 serde::Serialize,
21 serde::Deserialize,
22 SerializedBytes,
23)]
24pub struct ValidationReceipt {
25 pub dht_op_hash: DhtOpHash,
27
28 pub validation_status: ValidationStatus,
30
31 pub validators: Vec<AgentPubKey>,
33
34 pub when_integrated: Timestamp,
36}
37
38impl ValidationReceipt {
39 pub async fn sign(
41 self,
42 keystore: &MetaLairClient,
43 ) -> holochain_keystore::LairResult<Option<SignedValidationReceipt>> {
44 if self.validators.is_empty() {
45 return Ok(None);
46 }
47 let this = self.clone();
48 let futures = self
52 .validators
53 .iter()
54 .map(|validator| {
55 let this = this.clone();
56 let validator = validator.clone();
57 let keystore = keystore.clone();
58 async move { validator.sign(&keystore, this).await }
59 })
60 .collect::<Vec<_>>();
61 let stream = futures::stream::iter(futures);
62 let signatures = try_stream_of_results(stream).await?;
63 if signatures.is_empty() {
64 unreachable!("Signatures cannot be empty because the validators vec is not empty");
65 }
66 Ok(Some(SignedValidationReceipt {
67 receipt: self,
68 validators_signatures: signatures,
69 }))
70 }
71}
72
73async fn try_stream_of_results<T, U, E>(stream: U) -> Result<Vec<T>, E>
75where
76 U: Stream,
77 <U as Stream>::Item: futures::Future<Output = Result<T, E>>,
78{
79 stream.buffer_unordered(10).map(|r| r).try_collect().await
80}
81
82#[derive(
84 Debug,
85 Clone,
86 PartialEq,
87 Eq,
88 PartialOrd,
89 Ord,
90 Hash,
91 serde::Serialize,
92 serde::Deserialize,
93 SerializedBytes,
94)]
95pub struct SignedValidationReceipt {
96 pub receipt: ValidationReceipt,
98
99 pub validators_signatures: Vec<Signature>,
103}
104
105#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, SerializedBytes)]
107pub struct ValidationReceiptBundle(Vec<SignedValidationReceipt>);
108
109impl From<Vec<SignedValidationReceipt>> for ValidationReceiptBundle {
110 fn from(value: Vec<SignedValidationReceipt>) -> Self {
111 ValidationReceiptBundle(value)
112 }
113}
114
115impl IntoIterator for ValidationReceiptBundle {
116 type Item = SignedValidationReceipt;
117 type IntoIter = IntoIter<Self::Item>;
118
119 fn into_iter(self) -> Self::IntoIter {
120 self.0.into_iter()
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use crate::validation_receipt::try_stream_of_results;
127
128 #[tokio::test]
129 async fn test_try_stream_of_results() {
130 let iter: Vec<futures::future::Ready<Result<i32, String>>> = vec![];
131 let stream = futures::stream::iter(iter);
132 assert_eq!(Ok(vec![]), try_stream_of_results(stream).await);
133
134 let iter = vec![async move { Result::<_, String>::Ok(0) }];
135 let stream = futures::stream::iter(iter);
136 assert_eq!(Ok(vec![0]), try_stream_of_results(stream).await);
137
138 let iter = (0..10).map(|i| async move { Result::<_, String>::Ok(i) });
139 let stream = futures::stream::iter(iter);
140 assert_eq!(
141 Ok((0..10).collect::<Vec<_>>()),
142 try_stream_of_results(stream).await
143 );
144
145 let iter = vec![async move { Result::<i32, String>::Err("test".to_string()) }];
146 let stream = futures::stream::iter(iter);
147 assert_eq!(Err("test".to_string()), try_stream_of_results(stream).await);
148
149 let iter = (0..10).map(|_| async move { Result::<i32, String>::Err("test".to_string()) });
150 let stream = futures::stream::iter(iter);
151 assert_eq!(Err("test".to_string()), try_stream_of_results(stream).await);
152 }
153}