p2panda_stream/operation.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Methods to handle p2panda operations.
4use p2panda_core::{
5 Body, Extensions, Header, Operation, OperationError, validate_backlink, validate_operation,
6};
7use p2panda_store::{LogStore, OperationStore};
8use thiserror::Error;
9
10/// Checks an incoming operation for log integrity and persists it into the store when valid.
11///
12/// This method also automatically prunes the log when a prune flag was set.
13///
14/// If the operation seems valid but we're still lacking information (as it might have arrived
15/// out-of-order) this method does not fail but indicates that we might have to retry again later.
16pub async fn ingest_operation<S, L, E>(
17 store: &mut S,
18 header: Header<E>,
19 body: Option<Body>,
20 header_bytes: Vec<u8>,
21 log_id: &L,
22 prune_flag: bool,
23) -> Result<IngestResult<E>, IngestError>
24where
25 S: OperationStore<L, E> + LogStore<L, E>,
26 E: Extensions,
27{
28 let operation = Operation {
29 hash: header.hash(),
30 header,
31 body,
32 };
33
34 if let Err(err) = validate_operation(&operation) {
35 return Err(IngestError::InvalidOperation(err));
36 }
37
38 let already_exists = store
39 .has_operation(operation.hash)
40 .await
41 .map_err(|err| IngestError::StoreError(err.to_string()))?;
42 if !already_exists {
43 // If no pruning flag is set, we expect the log to have integrity with the previously given
44 // operation.
45 if !prune_flag {
46 let latest_operation = store
47 .latest_operation(&operation.header.public_key, log_id)
48 .await
49 .map_err(|err| IngestError::StoreError(err.to_string()))?;
50
51 match latest_operation {
52 Some(latest_operation) => {
53 if let Err(err) = validate_backlink(&latest_operation.0, &operation.header) {
54 match err {
55 // These errors signify that the sequence number is monotonic
56 // incrementing and correct, however the backlink does not match.
57 OperationError::BacklinkMismatch
58 | OperationError::BacklinkMissing
59 // Log can only contain operations from one author.
60 | OperationError::TooManyAuthors => {
61 return Err(IngestError::InvalidOperation(err))
62 }
63 // We observe a gap in the log and therefore can't validate the
64 // backlink yet.
65 OperationError::SeqNumNonIncremental(expected, given) => {
66 // Overflows happen when the given operation is already "outdated"
67 // and was pruned and our log grew in the meantime (thus "latest
68 // operation" returns a higher sequence number). This is a race
69 // condition which can occur in this async setting.
70 //
71 // We can safely ignore the operation here. It will _not_ be
72 // persisted as it technically was pruned already, we will still
73 // forward it to the application layer though. The application can
74 // decide if it should ignore or handle the "outdated" operation.
75 //
76 // TODO(adz): This implementation would show undefined behaviour
77 // when handling forks in pruned logs in presence of this race
78 // condition. This is a rather edge case, but needs to be dealt
79 // with.
80 let (behind, overflow) = given.overflowing_sub(expected);
81 if overflow {
82 return Ok(IngestResult::Outdated(operation))
83 } else {
84 return Ok(IngestResult::Retry(operation.header, operation.body, header_bytes, behind));
85 }
86 }
87 _ => unreachable!("other error cases have been handled before"),
88 }
89 }
90 }
91 // We're missing the whole log so far.
92 None => {
93 if operation.header.seq_num == 0 {
94 // We're at the beginning of the log, it's okay if there's no previous
95 // operation.
96 } else {
97 // We didn't prune, there's no previous operation, something is missing.
98 return Ok(IngestResult::Retry(
99 operation.header.clone(),
100 operation.body.clone(),
101 header_bytes,
102 operation.header.seq_num,
103 ));
104 }
105 }
106 }
107 }
108
109 store
110 .insert_operation(
111 operation.hash,
112 &operation.header,
113 operation.body.as_ref(),
114 &header_bytes,
115 log_id,
116 )
117 .await
118 .map_err(|err| IngestError::StoreError(err.to_string()))?;
119
120 if prune_flag {
121 store
122 .delete_operations(
123 &operation.header.public_key,
124 log_id,
125 operation.header.seq_num,
126 )
127 .await
128 .map_err(|err| IngestError::StoreError(err.to_string()))?;
129 }
130 }
131
132 Ok(IngestResult::Complete(operation))
133}
134
135/// Operations can be ingested directly or need to be re-tried if they arrived out-of-order.
136#[derive(Debug)]
137pub enum IngestResult<E> {
138 /// Operation has been successfully validated and persisted.
139 Complete(Operation<E>),
140
141 /// We're missing previous operations before we can try validating the backlink of this
142 /// operation.
143 ///
144 /// The number indicates how many operations we are lacking before we can attempt validation
145 /// again.
146 Retry(Header<E>, Option<Body>, Vec<u8>, u64),
147
148 /// Operation can be considered "outdated" as a "newer" operation in the log removed this
149 /// operation ("pruning") while we processed it.
150 ///
151 /// Applications usually want to ignore these operations as the latest operation will hold all
152 /// the state we need. Additionally we were also not able to correctly check its log integrity
153 /// anymore.
154 Outdated(Operation<E>),
155}
156
157/// Errors which can occur due to invalid operations or critical storage failures.
158#[derive(Clone, Debug, Error)]
159pub enum IngestError {
160 /// Operation can not be authenticated, has broken log- or payload integrity or doesn't follow
161 /// the p2panda specification.
162 #[error("operation validation failed: {0}")]
163 InvalidOperation(OperationError),
164
165 /// Header did not contain the extensions required by the p2panda specification.
166 #[error("missing \"{0}\" extension in header")]
167 MissingHeaderExtension(String),
168
169 /// Critical storage failure occurred. This is usually a reason to panic.
170 #[error("critical storage failure: {0}")]
171 StoreError(String),
172
173 /// Some implementations might optimistically retry to ingest operations which arrived
174 /// out-of-order. This error comes up when all given attempts have been exhausted.
175 #[error("too many attempts to ingest out-of-order operation ({0} behind in log)")]
176 MaxAttemptsReached(u64),
177}
178
179#[cfg(test)]
180mod tests {
181 use p2panda_core::{Hash, Header, PrivateKey};
182 use p2panda_store::MemoryStore;
183
184 use crate::operation::{IngestResult, ingest_operation};
185 use crate::test_utils::{Extensions, StreamName};
186
187 #[tokio::test]
188 async fn retry_result() {
189 let mut store = MemoryStore::<usize, Extensions>::new();
190 let private_key = PrivateKey::new();
191 let log_id = 1;
192
193 // 1. Create a regular first operation in a log.
194 let mut header = Header {
195 public_key: private_key.public_key(),
196 version: 1,
197 signature: None,
198 payload_size: 0,
199 payload_hash: None,
200 timestamp: 0,
201 seq_num: 0,
202 backlink: None,
203 previous: vec![],
204 extensions: Extensions::default(),
205 };
206 header.sign(&private_key);
207 let header_bytes = header.to_bytes();
208
209 let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
210 assert!(matches!(result, Ok(IngestResult::Complete(_))));
211
212 // 2. Create an operation which has already advanced in the log (it has a backlink and
213 // higher sequence number).
214 let mut header = Header {
215 public_key: private_key.public_key(),
216 version: 1,
217 signature: None,
218 payload_size: 0,
219 payload_hash: None,
220 timestamp: 0,
221 seq_num: 12, // we'll be missing 11 operations between the first and this one
222 backlink: Some(Hash::new(b"mock operation")),
223 previous: vec![],
224 extensions: Extensions::default(),
225 };
226 header.sign(&private_key);
227 let header_bytes = header.to_bytes();
228
229 let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
230 assert!(matches!(result, Ok(IngestResult::Retry(_, None, _, 11))));
231 }
232
233 #[tokio::test]
234 async fn ignore_outdated_pruned_operations() {
235 // Related issue: https://github.com/p2panda/p2panda/issues/711
236
237 let mut store = MemoryStore::<usize, Extensions>::new();
238 let private_key = PrivateKey::new();
239 let log_id = 1;
240
241 // 1. Create an advanced operation in a log which prunes all previous operations.
242 let mut header = Header {
243 public_key: private_key.public_key(),
244 version: 1,
245 signature: None,
246 payload_size: 0,
247 payload_hash: None,
248 timestamp: 0,
249 seq_num: 1,
250 backlink: Some(Hash::new(b"mock operation")),
251 previous: vec![],
252 extensions: Extensions {
253 stream_name: StreamName::new(private_key.public_key(), None),
254 prune_flag: true.into(),
255 },
256 };
257 header.sign(&private_key);
258 let header_bytes = header.to_bytes();
259
260 let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, true).await;
261 assert!(matches!(result, Ok(IngestResult::Complete(_))));
262
263 // 2. Create an operation which is from an "outdated" seq from before the log was pruned.
264 let mut header = Header {
265 public_key: private_key.public_key(),
266 version: 1,
267 signature: None,
268 payload_size: 0,
269 payload_hash: None,
270 timestamp: 0,
271 seq_num: 0,
272 backlink: None,
273 previous: vec![],
274 extensions: Extensions {
275 stream_name: StreamName::new(private_key.public_key(), None),
276 prune_flag: false.into(),
277 },
278 };
279 header.sign(&private_key);
280 let header_bytes = header.to_bytes();
281
282 let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
283 assert!(matches!(result, Ok(IngestResult::Outdated(_))));
284 }
285}