p2panda_stream/
operation.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Methods to handle p2panda operations.
4use p2panda_core::{
5    validate_backlink, validate_operation, Body, Extensions, Header, Operation, OperationError,
6};
7use p2panda_store::{LogStore, OperationStore};
8use thiserror::Error;
9
10/// Checks an incoming operation for log integrity and persists it into the store when valid.
11///
12/// This method also automatically prunes the log when a prune flag was set.
13///
14/// If the operation seems valid but we're still lacking information (as it might have arrived
15/// out-of-order) this method does not fail but indicates that we might have to retry again later.
16pub async fn ingest_operation<S, L, E>(
17    store: &mut S,
18    header: Header<E>,
19    body: Option<Body>,
20    header_bytes: Vec<u8>,
21    log_id: &L,
22    prune_flag: bool,
23) -> Result<IngestResult<E>, IngestError>
24where
25    S: OperationStore<L, E> + LogStore<L, E>,
26    E: Extensions,
27{
28    let operation = Operation {
29        hash: header.hash(),
30        header,
31        body,
32    };
33
34    if let Err(err) = validate_operation(&operation) {
35        return Err(IngestError::InvalidOperation(err));
36    }
37
38    let already_exists = store
39        .has_operation(operation.hash)
40        .await
41        .map_err(|err| IngestError::StoreError(err.to_string()))?;
42    if !already_exists {
43        // If no pruning flag is set, we expect the log to have integrity with the previously given
44        // operation.
45        if !prune_flag && operation.header.seq_num > 0 {
46            let latest_operation = store
47                .latest_operation(&operation.header.public_key, log_id)
48                .await
49                .map_err(|err| IngestError::StoreError(err.to_string()))?;
50
51            match latest_operation {
52                Some(latest_operation) => {
53                    if let Err(err) = validate_backlink(&latest_operation.0, &operation.header) {
54                        match err {
55                            // These errors signify that the sequence number is monotonic
56                            // incrementing and correct, however the backlink does not match.
57                            OperationError::BacklinkMismatch
58                            | OperationError::BacklinkMissing
59                            // Log can only contain operations from one author.
60                            | OperationError::TooManyAuthors => {
61                                return Err(IngestError::InvalidOperation(err))
62                            }
63                            // We observe a gap in the log and therefore can't validate the
64                            // backlink yet.
65                            OperationError::SeqNumNonIncremental(expected, given) => {
66                                return Ok(IngestResult::Retry(operation.header, operation.body, header_bytes, given - expected))
67                            }
68                            _ => unreachable!("other error cases have been handled before"),
69                        }
70                    }
71                }
72                // We're missing the whole log so far.
73                None => {
74                    return Ok(IngestResult::Retry(
75                        operation.header.clone(),
76                        operation.body.clone(),
77                        header_bytes,
78                        operation.header.seq_num,
79                    ))
80                }
81            }
82        }
83
84        store
85            .insert_operation(
86                operation.hash,
87                &operation.header,
88                operation.body.as_ref(),
89                &header_bytes,
90                log_id,
91            )
92            .await
93            .map_err(|err| IngestError::StoreError(err.to_string()))?;
94
95        if prune_flag {
96            store
97                .delete_operations(
98                    &operation.header.public_key,
99                    log_id,
100                    operation.header.seq_num,
101                )
102                .await
103                .map_err(|err| IngestError::StoreError(err.to_string()))?;
104        }
105    }
106
107    Ok(IngestResult::Complete(operation))
108}
109
110/// Operations can be ingested directly or need to be re-tried if they arrived out-of-order.
111#[derive(Debug)]
112pub enum IngestResult<E> {
113    /// Operation has been successfully validated and persisted.
114    Complete(Operation<E>),
115
116    /// We're missing previous operations before we can try validating the backlink of this
117    /// operation.
118    ///
119    /// The number indicates how many operations we are lacking before we can attempt validation
120    /// again.
121    Retry(Header<E>, Option<Body>, Vec<u8>, u64),
122}
123
124/// Errors which can occur due to invalid operations or critical storage failures.
125#[derive(Clone, Debug, Error)]
126pub enum IngestError {
127    /// Operation can not be authenticated, has broken log- or payload integrity or doesn't follow
128    /// the p2panda specification.
129    #[error("operation validation failed: {0}")]
130    InvalidOperation(OperationError),
131
132    /// Header did not contain the extensions required by the p2panda specification.
133    #[error("missing \"{0}\" extension in header")]
134    MissingHeaderExtension(String),
135
136    /// Critical storage failure occurred. This is usually a reason to panic.
137    #[error("critical storage failure: {0}")]
138    StoreError(String),
139
140    /// Some implementations might optimistically retry to ingest operations which arrived
141    /// out-of-order. This error comes up when all given attempts have been exhausted.
142    #[error("too many attempts to ingest out-of-order operation ({0} behind in log)")]
143    MaxAttemptsReached(u64),
144}
145
146#[cfg(test)]
147mod tests {
148    use p2panda_core::{Hash, Header, PrivateKey};
149    use p2panda_store::MemoryStore;
150
151    use crate::operation::{ingest_operation, IngestResult};
152    use crate::test_utils::Extensions;
153
154    #[tokio::test]
155    async fn retry_result() {
156        let mut store = MemoryStore::<usize, Extensions>::new();
157        let private_key = PrivateKey::new();
158        let log_id = 1;
159
160        // 1. Create a regular first operation in a log.
161        let mut header = Header {
162            public_key: private_key.public_key(),
163            version: 1,
164            signature: None,
165            payload_size: 0,
166            payload_hash: None,
167            timestamp: 0,
168            seq_num: 0,
169            backlink: None,
170            previous: vec![],
171            extensions: None,
172        };
173        header.sign(&private_key);
174        let header_bytes = header.to_bytes();
175
176        let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
177        assert!(matches!(result, Ok(IngestResult::Complete(_))));
178
179        // 2. Create an operation which has already advanced in the log (it has a backlink and
180        //    higher sequence number).
181        let mut header = Header {
182            public_key: private_key.public_key(),
183            version: 1,
184            signature: None,
185            payload_size: 0,
186            payload_hash: None,
187            timestamp: 0,
188            seq_num: 12, // we'll be missing 11 operations between the first and this one
189            backlink: Some(Hash::new(b"mock operation")),
190            previous: vec![],
191            extensions: None,
192        };
193        header.sign(&private_key);
194        let header_bytes = header.to_bytes();
195
196        let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
197        assert!(matches!(result, Ok(IngestResult::Retry(_, None, _, 11))));
198    }
199}