holochain_types/
validation_receipt.rs

1//! Types for validation receipts and signed validation receipts to be sent between peers.
2
3use crate::prelude::{Signature, Timestamp};
4use futures::{Stream, StreamExt, TryStreamExt};
5use holo_hash::{AgentPubKey, DhtOpHash};
6use holochain_keystore::{AgentPubKeyExt, MetaLairClient};
7use holochain_serialized_bytes::prelude::*;
8use holochain_zome_types::prelude::*;
9use std::vec::IntoIter;
10
11/// Validation receipt content - to be signed.
12#[derive(
13    Debug,
14    Clone,
15    PartialEq,
16    Eq,
17    PartialOrd,
18    Ord,
19    Hash,
20    serde::Serialize,
21    serde::Deserialize,
22    SerializedBytes,
23)]
24pub struct ValidationReceipt {
25    /// the op this validation receipt is for.
26    pub dht_op_hash: DhtOpHash,
27
28    /// the result of this validation.
29    pub validation_status: ValidationStatus,
30
31    /// the remote validator which is signing this receipt.
32    pub validators: Vec<AgentPubKey>,
33
34    /// Time when the op was integrated
35    pub when_integrated: Timestamp,
36}
37
38impl ValidationReceipt {
39    /// Sign this validation receipt.
40    pub async fn sign(
41        self,
42        keystore: &MetaLairClient,
43    ) -> holochain_keystore::LairResult<Option<SignedValidationReceipt>> {
44        if self.validators.is_empty() {
45            return Ok(None);
46        }
47        let this = self.clone();
48        // Try to sign with all validators but silently fail on
49        // any that cannot sign.
50        // If all signatures fail then return an error.
51        let futures = self
52            .validators
53            .iter()
54            .map(|validator| {
55                let this = this.clone();
56                let validator = validator.clone();
57                let keystore = keystore.clone();
58                async move { validator.sign(&keystore, this).await }
59            })
60            .collect::<Vec<_>>();
61        let stream = futures::stream::iter(futures);
62        let signatures = try_stream_of_results(stream).await?;
63        if signatures.is_empty() {
64            unreachable!("Signatures cannot be empty because the validators vec is not empty");
65        }
66        Ok(Some(SignedValidationReceipt {
67            receipt: self,
68            validators_signatures: signatures,
69        }))
70    }
71}
72
73/// Try to collect a stream of futures that return results into a vec.
74async fn try_stream_of_results<T, U, E>(stream: U) -> Result<Vec<T>, E>
75where
76    U: Stream,
77    <U as Stream>::Item: futures::Future<Output = Result<T, E>>,
78{
79    stream.buffer_unordered(10).map(|r| r).try_collect().await
80}
81
82/// A full, signed validation receipt.
83#[derive(
84    Debug,
85    Clone,
86    PartialEq,
87    Eq,
88    PartialOrd,
89    Ord,
90    Hash,
91    serde::Serialize,
92    serde::Deserialize,
93    SerializedBytes,
94)]
95pub struct SignedValidationReceipt {
96    /// the content of the validation receipt.
97    pub receipt: ValidationReceipt,
98
99    // TODO This is just the signature and not the original message, should this be a full signature and get validated
100    //      when it is received? https://github.com/holochain/holochain/pull/2848#discussion_r1346160783
101    /// the signature of the remote validator.
102    pub validators_signatures: Vec<Signature>,
103}
104
105/// A bundle of validation receipts to be sent together.
106#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, SerializedBytes)]
107pub struct ValidationReceiptBundle(Vec<SignedValidationReceipt>);
108
109impl From<Vec<SignedValidationReceipt>> for ValidationReceiptBundle {
110    fn from(value: Vec<SignedValidationReceipt>) -> Self {
111        ValidationReceiptBundle(value)
112    }
113}
114
115impl IntoIterator for ValidationReceiptBundle {
116    type Item = SignedValidationReceipt;
117    type IntoIter = IntoIter<Self::Item>;
118
119    fn into_iter(self) -> Self::IntoIter {
120        self.0.into_iter()
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use crate::validation_receipt::try_stream_of_results;
127
128    #[tokio::test]
129    async fn test_try_stream_of_results() {
130        let iter: Vec<futures::future::Ready<Result<i32, String>>> = vec![];
131        let stream = futures::stream::iter(iter);
132        assert_eq!(Ok(vec![]), try_stream_of_results(stream).await);
133
134        let iter = vec![async move { Result::<_, String>::Ok(0) }];
135        let stream = futures::stream::iter(iter);
136        assert_eq!(Ok(vec![0]), try_stream_of_results(stream).await);
137
138        let iter = (0..10).map(|i| async move { Result::<_, String>::Ok(i) });
139        let stream = futures::stream::iter(iter);
140        assert_eq!(
141            Ok((0..10).collect::<Vec<_>>()),
142            try_stream_of_results(stream).await
143        );
144
145        let iter = vec![async move { Result::<i32, String>::Err("test".to_string()) }];
146        let stream = futures::stream::iter(iter);
147        assert_eq!(Err("test".to_string()), try_stream_of_results(stream).await);
148
149        let iter = (0..10).map(|_| async move { Result::<i32, String>::Err("test".to_string()) });
150        let stream = futures::stream::iter(iter);
151        assert_eq!(Err("test".to_string()), try_stream_of_results(stream).await);
152    }
153}