p2panda_stream/
operation.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Methods to handle p2panda operations.
4use p2panda_core::{
5    Body, Extensions, Header, Operation, OperationError, validate_backlink, validate_operation,
6};
7use p2panda_store::{LogStore, OperationStore};
8use thiserror::Error;
9
10/// Checks an incoming operation for log integrity and persists it into the store when valid.
11///
12/// This method also automatically prunes the log when a prune flag was set.
13///
14/// If the operation seems valid but we're still lacking information (as it might have arrived
15/// out-of-order) this method does not fail but indicates that we might have to retry again later.
16pub async fn ingest_operation<S, L, E>(
17    store: &mut S,
18    header: Header<E>,
19    body: Option<Body>,
20    header_bytes: Vec<u8>,
21    log_id: &L,
22    prune_flag: bool,
23) -> Result<IngestResult<E>, IngestError>
24where
25    S: OperationStore<L, E> + LogStore<L, E>,
26    E: Extensions,
27{
28    let operation = Operation {
29        hash: header.hash(),
30        header,
31        body,
32    };
33
34    if let Err(err) = validate_operation(&operation) {
35        return Err(IngestError::InvalidOperation(err));
36    }
37
38    let already_exists = store
39        .has_operation(operation.hash)
40        .await
41        .map_err(|err| IngestError::StoreError(err.to_string()))?;
42    if !already_exists {
43        // If no pruning flag is set, we expect the log to have integrity with the previously given
44        // operation.
45        if !prune_flag {
46            let latest_operation = store
47                .latest_operation(&operation.header.public_key, log_id)
48                .await
49                .map_err(|err| IngestError::StoreError(err.to_string()))?;
50
51            match latest_operation {
52                Some(latest_operation) => {
53                    if let Err(err) = validate_backlink(&latest_operation.0, &operation.header) {
54                        match err {
55                            // These errors signify that the sequence number is monotonic
56                            // incrementing and correct, however the backlink does not match.
57                            OperationError::BacklinkMismatch
58                            | OperationError::BacklinkMissing
59                            // Log can only contain operations from one author.
60                            | OperationError::TooManyAuthors => {
61                                return Err(IngestError::InvalidOperation(err))
62                            }
63                            // We observe a gap in the log and therefore can't validate the
64                            // backlink yet.
65                            OperationError::SeqNumNonIncremental(expected, given) => {
66                                // Overflows happen when the given operation is already "outdated"
67                                // and was pruned and our log grew in the meantime (thus "latest
68                                // operation" returns a higher sequence number). This is a race
69                                // condition which can occur in this async setting.
70                                //
71                                // We can safely ignore the operation here. It will _not_ be
72                                // persisted as it technically was pruned already, we will still
73                                // forward it to the application layer though. The application can
74                                // decide if it should ignore or handle the "outdated" operation.
75                                //
76                                // TODO(adz): This implementation would show undefined behaviour
77                                // when handling forks in pruned logs in presence of this race
78                                // condition. This is a rather edge case, but needs to be dealt
79                                // with.
80                                let (behind, overflow) = given.overflowing_sub(expected);
81                                if overflow {
82                                    return Ok(IngestResult::Outdated(operation))
83                                } else {
84                                    return Ok(IngestResult::Retry(operation.header, operation.body, header_bytes, behind));
85                                }
86                            }
87                            _ => unreachable!("other error cases have been handled before"),
88                        }
89                    }
90                }
91                // We're missing the whole log so far.
92                None => {
93                    if operation.header.seq_num == 0 {
94                        // We're at the beginning of the log, it's okay if there's no previous
95                        // operation.
96                    } else {
97                        // We didn't prune, there's no previous operation, something is missing.
98                        return Ok(IngestResult::Retry(
99                            operation.header.clone(),
100                            operation.body.clone(),
101                            header_bytes,
102                            operation.header.seq_num,
103                        ));
104                    }
105                }
106            }
107        }
108
109        store
110            .insert_operation(
111                operation.hash,
112                &operation.header,
113                operation.body.as_ref(),
114                &header_bytes,
115                log_id,
116            )
117            .await
118            .map_err(|err| IngestError::StoreError(err.to_string()))?;
119
120        if prune_flag {
121            store
122                .delete_operations(
123                    &operation.header.public_key,
124                    log_id,
125                    operation.header.seq_num,
126                )
127                .await
128                .map_err(|err| IngestError::StoreError(err.to_string()))?;
129        }
130    }
131
132    Ok(IngestResult::Complete(operation))
133}
134
135/// Operations can be ingested directly or need to be re-tried if they arrived out-of-order.
136#[derive(Debug)]
137pub enum IngestResult<E> {
138    /// Operation has been successfully validated and persisted.
139    Complete(Operation<E>),
140
141    /// We're missing previous operations before we can try validating the backlink of this
142    /// operation.
143    ///
144    /// The number indicates how many operations we are lacking before we can attempt validation
145    /// again.
146    Retry(Header<E>, Option<Body>, Vec<u8>, u64),
147
148    /// Operation can be considered "outdated" as a "newer" operation in the log removed this
149    /// operation ("pruning") while we processed it.
150    ///
151    /// Applications usually want to ignore these operations as the latest operation will hold all
152    /// the state we need. Additionally we were also not able to correctly check its log integrity
153    /// anymore.
154    Outdated(Operation<E>),
155}
156
157/// Errors which can occur due to invalid operations or critical storage failures.
158#[derive(Clone, Debug, Error)]
159pub enum IngestError {
160    /// Operation can not be authenticated, has broken log- or payload integrity or doesn't follow
161    /// the p2panda specification.
162    #[error("operation validation failed: {0}")]
163    InvalidOperation(OperationError),
164
165    /// Header did not contain the extensions required by the p2panda specification.
166    #[error("missing \"{0}\" extension in header")]
167    MissingHeaderExtension(String),
168
169    /// Critical storage failure occurred. This is usually a reason to panic.
170    #[error("critical storage failure: {0}")]
171    StoreError(String),
172
173    /// Some implementations might optimistically retry to ingest operations which arrived
174    /// out-of-order. This error comes up when all given attempts have been exhausted.
175    #[error("too many attempts to ingest out-of-order operation ({0} behind in log)")]
176    MaxAttemptsReached(u64),
177}
178
179#[cfg(test)]
180mod tests {
181    use p2panda_core::{Hash, Header, PrivateKey};
182    use p2panda_store::MemoryStore;
183
184    use crate::operation::{IngestResult, ingest_operation};
185    use crate::test_utils::{Extensions, StreamName};
186
187    #[tokio::test]
188    async fn retry_result() {
189        let mut store = MemoryStore::<usize, Extensions>::new();
190        let private_key = PrivateKey::new();
191        let log_id = 1;
192
193        // 1. Create a regular first operation in a log.
194        let mut header = Header {
195            public_key: private_key.public_key(),
196            version: 1,
197            signature: None,
198            payload_size: 0,
199            payload_hash: None,
200            timestamp: 0,
201            seq_num: 0,
202            backlink: None,
203            previous: vec![],
204            extensions: Extensions::default(),
205        };
206        header.sign(&private_key);
207        let header_bytes = header.to_bytes();
208
209        let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
210        assert!(matches!(result, Ok(IngestResult::Complete(_))));
211
212        // 2. Create an operation which has already advanced in the log (it has a backlink and
213        //    higher sequence number).
214        let mut header = Header {
215            public_key: private_key.public_key(),
216            version: 1,
217            signature: None,
218            payload_size: 0,
219            payload_hash: None,
220            timestamp: 0,
221            seq_num: 12, // we'll be missing 11 operations between the first and this one
222            backlink: Some(Hash::new(b"mock operation")),
223            previous: vec![],
224            extensions: Extensions::default(),
225        };
226        header.sign(&private_key);
227        let header_bytes = header.to_bytes();
228
229        let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
230        assert!(matches!(result, Ok(IngestResult::Retry(_, None, _, 11))));
231    }
232
233    #[tokio::test]
234    async fn ignore_outdated_pruned_operations() {
235        // Related issue: https://github.com/p2panda/p2panda/issues/711
236
237        let mut store = MemoryStore::<usize, Extensions>::new();
238        let private_key = PrivateKey::new();
239        let log_id = 1;
240
241        // 1. Create an advanced operation in a log which prunes all previous operations.
242        let mut header = Header {
243            public_key: private_key.public_key(),
244            version: 1,
245            signature: None,
246            payload_size: 0,
247            payload_hash: None,
248            timestamp: 0,
249            seq_num: 1,
250            backlink: Some(Hash::new(b"mock operation")),
251            previous: vec![],
252            extensions: Extensions {
253                stream_name: StreamName::new(private_key.public_key(), None),
254                prune_flag: true.into(),
255            },
256        };
257        header.sign(&private_key);
258        let header_bytes = header.to_bytes();
259
260        let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, true).await;
261        assert!(matches!(result, Ok(IngestResult::Complete(_))));
262
263        // 2. Create an operation which is from an "outdated" seq from before the log was pruned.
264        let mut header = Header {
265            public_key: private_key.public_key(),
266            version: 1,
267            signature: None,
268            payload_size: 0,
269            payload_hash: None,
270            timestamp: 0,
271            seq_num: 0,
272            backlink: None,
273            previous: vec![],
274            extensions: Extensions {
275                stream_name: StreamName::new(private_key.public_key(), None),
276                prune_flag: false.into(),
277            },
278        };
279        header.sign(&private_key);
280        let header_bytes = header.to_bytes();
281
282        let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
283        assert!(matches!(result, Ok(IngestResult::Outdated(_))));
284    }
285}