p2panda_stream/operation.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
// SPDX-License-Identifier: MIT OR Apache-2.0
//! Methods to handle p2panda operations.
use p2panda_core::{
validate_backlink, validate_operation, Body, Extensions, Header, Operation, OperationError,
};
use p2panda_store::{LogStore, OperationStore};
use thiserror::Error;
/// Checks an incoming operation for log integrity and persists it into the store when valid.
///
/// This method also automatically prunes the log when a prune flag was set.
///
/// If the operation seems valid but we're still lacking information (as it might have arrived
/// out-of-order) this method does not fail but indicates that we might have to retry again later.
pub async fn ingest_operation<S, L, E>(
store: &mut S,
header: Header<E>,
body: Option<Body>,
header_bytes: Vec<u8>,
log_id: &L,
prune_flag: bool,
) -> Result<IngestResult<E>, IngestError>
where
S: OperationStore<L, E> + LogStore<L, E>,
E: Extensions,
{
let operation = Operation {
hash: header.hash(),
header,
body,
};
if let Err(err) = validate_operation(&operation) {
return Err(IngestError::InvalidOperation(err));
}
let already_exists = store
.has_operation(operation.hash)
.await
.map_err(|err| IngestError::StoreError(err.to_string()))?;
if !already_exists {
// If no pruning flag is set, we expect the log to have integrity with the previously given
// operation.
if !prune_flag && operation.header.seq_num > 0 {
let latest_operation = store
.latest_operation(&operation.header.public_key, log_id)
.await
.map_err(|err| IngestError::StoreError(err.to_string()))?;
match latest_operation {
Some(latest_operation) => {
if let Err(err) = validate_backlink(&latest_operation.0, &operation.header) {
match err {
// These errors signify that the sequence number is monotonic
// incrementing and correct, however the backlink does not match.
OperationError::BacklinkMismatch
| OperationError::BacklinkMissing
// Log can only contain operations from one author.
| OperationError::TooManyAuthors => {
return Err(IngestError::InvalidOperation(err))
}
// We observe a gap in the log and therefore can't validate the
// backlink yet.
OperationError::SeqNumNonIncremental(expected, given) => {
return Ok(IngestResult::Retry(operation.header, operation.body, header_bytes, given - expected))
}
_ => unreachable!("other error cases have been handled before"),
}
}
}
// We're missing the whole log so far.
None => {
return Ok(IngestResult::Retry(
operation.header.clone(),
operation.body.clone(),
header_bytes,
operation.header.seq_num,
))
}
}
}
store
.insert_operation(
operation.hash,
&operation.header,
operation.body.as_ref(),
&header_bytes,
log_id,
)
.await
.map_err(|err| IngestError::StoreError(err.to_string()))?;
if prune_flag {
store
.delete_operations(
&operation.header.public_key,
log_id,
operation.header.seq_num,
)
.await
.map_err(|err| IngestError::StoreError(err.to_string()))?;
}
}
Ok(IngestResult::Complete(operation))
}
/// Operations can be ingested directly or need to be re-tried if they arrived out-of-order.
#[derive(Debug)]
pub enum IngestResult<E> {
/// Operation has been successfully validated and persisted.
Complete(Operation<E>),
/// We're missing previous operations before we can try validating the backlink of this
/// operation.
///
/// The number indicates how many operations we are lacking before we can attempt validation
/// again.
Retry(Header<E>, Option<Body>, Vec<u8>, u64),
}
/// Errors which can occur due to invalid operations or critical storage failures.
#[derive(Debug, Error)]
pub enum IngestError {
/// Operation can not be authenticated, has broken log- or payload integrity or doesn't follow
/// the p2panda specification.
#[error("operation validation failed: {0}")]
InvalidOperation(OperationError),
/// Header did not contain the extensions required by the p2panda specification.
#[error("missing \"{0}\" extension in header")]
MissingHeaderExtension(String),
/// Critical storage failure occurred. This is usually a reason to panic.
#[error("critical storage failure: {0}")]
StoreError(String),
/// Some implementations might optimistically retry to ingest operations which arrived
/// out-of-order. This error comes up when all given attempts have been exhausted.
#[error("too many attempts to ingest out-of-order operation ({0} behind in log)")]
MaxAttemptsReached(u64),
}
#[cfg(test)]
mod tests {
use p2panda_core::{Hash, Header, PrivateKey};
use p2panda_store::MemoryStore;
use crate::operation::{ingest_operation, IngestResult};
use crate::test_utils::Extensions;
#[tokio::test]
async fn retry_result() {
let mut store = MemoryStore::<usize, Extensions>::new();
let private_key = PrivateKey::new();
let log_id = 1;
// 1. Create a regular first operation in a log.
let mut header = Header {
public_key: private_key.public_key(),
version: 1,
signature: None,
payload_size: 0,
payload_hash: None,
timestamp: 0,
seq_num: 0,
backlink: None,
previous: vec![],
extensions: None,
};
header.sign(&private_key);
let header_bytes = header.to_bytes();
let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
assert!(matches!(result, Ok(IngestResult::Complete(_))));
// 2. Create an operation which has already advanced in the log (it has a backlink and
// higher sequence number).
let mut header = Header {
public_key: private_key.public_key(),
version: 1,
signature: None,
payload_size: 0,
payload_hash: None,
timestamp: 0,
seq_num: 12, // we'll be missing 11 operations between the first and this one
backlink: Some(Hash::new(b"mock operation")),
previous: vec![],
extensions: None,
};
header.sign(&private_key);
let header_bytes = header.to_bytes();
let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
assert!(matches!(result, Ok(IngestResult::Retry(_, None, _, 11))));
}
}