p2panda_stream/ordering/partial/
operations.rs1use std::marker::PhantomData;
4
5use p2panda_core::{Extensions, Hash, Operation};
6use p2panda_store::{LogStore, OperationStore};
7use thiserror::Error;
8
9use crate::ordering::partial::{
10 PartialOrder as InnerPartialOrder, PartialOrderError, PartialOrderStore,
11};
12
13#[derive(Debug)]
20pub struct PartialOrder<L, E, OS, POS> {
21 operation_store: OS,
26
27 inner: InnerPartialOrder<Hash, POS>,
29
30 _phantom: PhantomData<(L, E)>,
31}
32
33impl<L, E, OS, POS> PartialOrder<L, E, OS, POS>
34where
35 OS: OperationStore<L, E> + LogStore<L, E>,
36 POS: PartialOrderStore<Hash>,
37 E: Extensions,
38{
39 pub fn new(operation_store: OS, partial_order_store: POS) -> Self {
41 let inner_dependency_checker = InnerPartialOrder::new(partial_order_store);
42 PartialOrder {
43 operation_store,
44 inner: inner_dependency_checker,
45 _phantom: PhantomData,
46 }
47 }
48
49 pub async fn process(
51 &mut self,
52 operation: Operation<E>,
53 ) -> Result<(), OperationDependencyCheckerError> {
54 let hash = operation.hash;
55 let previous = operation.header.previous.clone();
56 self.inner.process(hash, &previous).await?;
57 Ok(())
58 }
59
60 pub async fn next(&mut self) -> Result<Option<Operation<E>>, OperationDependencyCheckerError> {
62 let Some(hash) = self.inner.next().await? else {
63 return Ok(None);
64 };
65
66 if let Some((header, body)) = self
67 .operation_store
68 .get_operation(hash)
69 .await
70 .map_err(|err| OperationDependencyCheckerError::StoreError(err.to_string()))?
71 {
72 let operation = Operation { hash, header, body };
73 Ok(Some(operation))
74 } else {
75 Err(OperationDependencyCheckerError::MissingOperation(hash))
76 }
77 }
78}
79
80#[derive(Debug, Error)]
81pub enum OperationDependencyCheckerError {
82 #[error(transparent)]
83 CheckerError(#[from] PartialOrderError),
84
85 #[error("store error: {0}")]
86 StoreError(String),
87
88 #[error("processed operation not found in store: {0}")]
89 MissingOperation(Hash),
90}
91
92#[cfg(test)]
93mod tests {
94 use std::collections::HashSet;
95
96 use p2panda_core::{Header, Operation, PrivateKey};
97 use p2panda_store::{MemoryStore, OperationStore};
98
99 use crate::ordering::partial::MemoryStore as PartialOrderMemoryStore;
100
101 use super::PartialOrder;
102
103 async fn setup(
111 private_key: &PrivateKey,
112 operation_store: &mut MemoryStore<u64>,
113 ) -> Vec<Operation> {
114 let mut header_0 = Header {
115 public_key: private_key.public_key(),
116 timestamp: 0,
117 ..Default::default()
118 };
119 header_0.sign(private_key);
120 let operation_0 = Operation {
121 hash: header_0.hash(),
122 header: header_0.clone(),
123 body: None,
124 };
125
126 let mut header_1 = Header {
127 public_key: private_key.public_key(),
128 previous: vec![header_0.hash()],
129 timestamp: 1,
130 ..Default::default()
131 };
132 header_1.sign(private_key);
133 let operation_1 = Operation {
134 hash: header_1.hash(),
135 header: header_1.clone(),
136 body: None,
137 };
138
139 let mut header_2 = Header {
140 public_key: private_key.public_key(),
141 previous: vec![header_0.hash()],
142 timestamp: 2,
143 ..Default::default()
144 };
145 header_2.sign(private_key);
146 let operation_2 = Operation {
147 hash: header_2.hash(),
148 header: header_2.clone(),
149 body: None,
150 };
151
152 let mut header_3 = Header {
153 public_key: private_key.public_key(),
154 previous: vec![header_0.hash()],
155 timestamp: 3,
156 ..Default::default()
157 };
158 header_3.sign(private_key);
159 let operation_3 = Operation {
160 hash: header_3.hash(),
161 header: header_3.clone(),
162 body: None,
163 };
164
165 let mut header_4 = Header {
166 public_key: private_key.public_key(),
167 previous: vec![header_1.hash(), header_2.hash(), header_3.hash()],
168 timestamp: 4,
169 ..Default::default()
170 };
171 header_4.sign(private_key);
172 let operation_4 = Operation {
173 hash: header_4.hash(),
174 header: header_4.clone(),
175 body: None,
176 };
177
178 operation_store
179 .insert_operation(header_0.hash(), &header_0, None, &header_0.to_bytes(), &0)
180 .await
181 .unwrap();
182
183 operation_store
184 .insert_operation(header_1.hash(), &header_1, None, &header_1.to_bytes(), &1)
185 .await
186 .unwrap();
187
188 operation_store
189 .insert_operation(header_2.hash(), &header_2, None, &header_2.to_bytes(), &2)
190 .await
191 .unwrap();
192
193 operation_store
194 .insert_operation(header_3.hash(), &header_3, None, &header_3.to_bytes(), &3)
195 .await
196 .unwrap();
197
198 operation_store
199 .insert_operation(header_4.hash(), &header_4, None, &header_4.to_bytes(), &4)
200 .await
201 .unwrap();
202
203 vec![
204 operation_0,
205 operation_1,
206 operation_2,
207 operation_3,
208 operation_4,
209 ]
210 }
211
212 #[tokio::test]
213 async fn operations_with_previous() {
214 let private_key = PrivateKey::new();
215 let mut operation_store = MemoryStore::<u64>::default();
216 let partial_order_store = PartialOrderMemoryStore::default();
217 let mut dependency_checker =
218 PartialOrder::new(operation_store.clone(), partial_order_store);
219
220 let operations = setup(&private_key, &mut operation_store).await;
227
228 let result = dependency_checker.process(operations[4].clone()).await;
230 assert!(result.is_ok());
231
232 let result = dependency_checker.process(operations[3].clone()).await;
233 assert!(result.is_ok());
234
235 let result = dependency_checker.process(operations[2].clone()).await;
236 assert!(result.is_ok());
237
238 let result = dependency_checker.process(operations[1].clone()).await;
239 assert!(result.is_ok());
240
241 let result = dependency_checker.process(operations[0].clone()).await;
242 assert!(result.is_ok());
243
244 let next = dependency_checker.next().await.unwrap();
247 assert_eq!(next.unwrap(), operations[0]);
248
249 let mut concurrent_operations =
252 HashSet::from([operations[1].hash, operations[2].hash, operations[3].hash]);
253
254 let next = dependency_checker.next().await.unwrap();
255 assert!(next.is_some());
256 let next = next.unwrap();
257 assert!(concurrent_operations.remove(&next.hash),);
258
259 let next = dependency_checker.next().await.unwrap();
260 assert!(next.is_some());
261 let next = next.unwrap();
262 assert!(concurrent_operations.remove(&next.hash),);
263
264 let next = dependency_checker.next().await.unwrap();
265 assert!(next.is_some());
266 let next = next.unwrap();
267 assert!(concurrent_operations.remove(&next.hash),);
268
269 let next = dependency_checker.next().await.unwrap();
271 assert_eq!(next.unwrap(), operations[4]);
272 }
273
274 #[tokio::test]
275 async fn missing_dependency() {
276 let private_key = PrivateKey::new();
277 let mut operation_store = MemoryStore::<u64>::default();
278 let partial_order_store = PartialOrderMemoryStore::default();
279 let mut dependency_checker =
280 PartialOrder::new(operation_store.clone(), partial_order_store);
281
282 let operations = setup(&private_key, &mut operation_store).await;
283
284 let result = dependency_checker.process(operations[1].clone()).await;
285 assert!(result.is_ok());
286 let result = dependency_checker.process(operations[2].clone()).await;
287 assert!(result.is_ok());
288 let result = dependency_checker.process(operations[3].clone()).await;
289 assert!(result.is_ok());
290 let result = dependency_checker.process(operations[4].clone()).await;
291 assert!(result.is_ok());
292
293 let next = dependency_checker.next().await.unwrap();
294 assert!(next.is_none());
295 }
296}