1use p2panda_core::{
5 validate_backlink, validate_operation, Body, Extensions, Header, Operation, OperationError,
6};
7use p2panda_store::{LogStore, OperationStore};
8use thiserror::Error;
9
10pub async fn ingest_operation<S, L, E>(
17 store: &mut S,
18 header: Header<E>,
19 body: Option<Body>,
20 header_bytes: Vec<u8>,
21 log_id: &L,
22 prune_flag: bool,
23) -> Result<IngestResult<E>, IngestError>
24where
25 S: OperationStore<L, E> + LogStore<L, E>,
26 E: Extensions,
27{
28 let operation = Operation {
29 hash: header.hash(),
30 header,
31 body,
32 };
33
34 if let Err(err) = validate_operation(&operation) {
35 return Err(IngestError::InvalidOperation(err));
36 }
37
38 let already_exists = store
39 .has_operation(operation.hash)
40 .await
41 .map_err(|err| IngestError::StoreError(err.to_string()))?;
42 if !already_exists {
43 if !prune_flag && operation.header.seq_num > 0 {
46 let latest_operation = store
47 .latest_operation(&operation.header.public_key, log_id)
48 .await
49 .map_err(|err| IngestError::StoreError(err.to_string()))?;
50
51 match latest_operation {
52 Some(latest_operation) => {
53 if let Err(err) = validate_backlink(&latest_operation.0, &operation.header) {
54 match err {
55 OperationError::BacklinkMismatch
58 | OperationError::BacklinkMissing
59 | OperationError::TooManyAuthors => {
61 return Err(IngestError::InvalidOperation(err))
62 }
63 OperationError::SeqNumNonIncremental(expected, given) => {
66 return Ok(IngestResult::Retry(operation.header, operation.body, header_bytes, given - expected))
67 }
68 _ => unreachable!("other error cases have been handled before"),
69 }
70 }
71 }
72 None => {
74 return Ok(IngestResult::Retry(
75 operation.header.clone(),
76 operation.body.clone(),
77 header_bytes,
78 operation.header.seq_num,
79 ))
80 }
81 }
82 }
83
84 store
85 .insert_operation(
86 operation.hash,
87 &operation.header,
88 operation.body.as_ref(),
89 &header_bytes,
90 log_id,
91 )
92 .await
93 .map_err(|err| IngestError::StoreError(err.to_string()))?;
94
95 if prune_flag {
96 store
97 .delete_operations(
98 &operation.header.public_key,
99 log_id,
100 operation.header.seq_num,
101 )
102 .await
103 .map_err(|err| IngestError::StoreError(err.to_string()))?;
104 }
105 }
106
107 Ok(IngestResult::Complete(operation))
108}
109
110#[derive(Debug)]
112pub enum IngestResult<E> {
113 Complete(Operation<E>),
115
116 Retry(Header<E>, Option<Body>, Vec<u8>, u64),
122}
123
124#[derive(Clone, Debug, Error)]
126pub enum IngestError {
127 #[error("operation validation failed: {0}")]
130 InvalidOperation(OperationError),
131
132 #[error("missing \"{0}\" extension in header")]
134 MissingHeaderExtension(String),
135
136 #[error("critical storage failure: {0}")]
138 StoreError(String),
139
140 #[error("too many attempts to ingest out-of-order operation ({0} behind in log)")]
143 MaxAttemptsReached(u64),
144}
145
146#[cfg(test)]
147mod tests {
148 use p2panda_core::{Hash, Header, PrivateKey};
149 use p2panda_store::MemoryStore;
150
151 use crate::operation::{ingest_operation, IngestResult};
152 use crate::test_utils::Extensions;
153
154 #[tokio::test]
155 async fn retry_result() {
156 let mut store = MemoryStore::<usize, Extensions>::new();
157 let private_key = PrivateKey::new();
158 let log_id = 1;
159
160 let mut header = Header {
162 public_key: private_key.public_key(),
163 version: 1,
164 signature: None,
165 payload_size: 0,
166 payload_hash: None,
167 timestamp: 0,
168 seq_num: 0,
169 backlink: None,
170 previous: vec![],
171 extensions: None,
172 };
173 header.sign(&private_key);
174 let header_bytes = header.to_bytes();
175
176 let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
177 assert!(matches!(result, Ok(IngestResult::Complete(_))));
178
179 let mut header = Header {
182 public_key: private_key.public_key(),
183 version: 1,
184 signature: None,
185 payload_size: 0,
186 payload_hash: None,
187 timestamp: 0,
188 seq_num: 12, backlink: Some(Hash::new(b"mock operation")),
190 previous: vec![],
191 extensions: None,
192 };
193 header.sign(&private_key);
194 let header_bytes = header.to_bytes();
195
196 let result = ingest_operation(&mut store, header, None, header_bytes, &log_id, false).await;
197 assert!(matches!(result, Ok(IngestResult::Retry(_, None, _, 11))));
198 }
199}