p2panda_stream/ordering/partial/
operations.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::marker::PhantomData;
4
5use p2panda_core::{Extensions, Hash, Operation};
6use p2panda_store::{LogStore, OperationStore};
7use thiserror::Error;
8
9use crate::ordering::partial::{
10    PartialOrder as InnerPartialOrder, PartialOrderError, PartialOrderStore,
11};
12
13/// Struct for processing p2panda operations into a partial order based on dependencies expressed
14/// in their `previous` field.
15///
16/// This struct is a thin wrapper around ordering::PartialOrder struct which takes care of sorting
17/// the operation dependency graph into a partial order. Here we have the addition of a `LogStore`
18/// and `OperationStore` implementation (traits from `p2panda-store`).
19#[derive(Debug)]
20pub struct PartialOrder<L, E, OS, POS> {
21    /// A store containing p2panda operations.
22    ///
23    /// It is assumed that any operations being processed by the PartialOrder struct are already
24    /// present in the store, as we may need to retrieve them later.
25    operation_store: OS,
26
27    /// The inner PartialOrder struct which sorts the operation DAG into a partial order.
28    inner: InnerPartialOrder<Hash, POS>,
29
30    _phantom: PhantomData<(L, E)>,
31}
32
33impl<L, E, OS, POS> PartialOrder<L, E, OS, POS>
34where
35    OS: OperationStore<L, E> + LogStore<L, E>,
36    POS: PartialOrderStore<Hash>,
37    E: Extensions,
38{
39    /// Construct a new dependency checker from an operation store and dependency store.
40    pub fn new(operation_store: OS, partial_order_store: POS) -> Self {
41        let inner_dependency_checker = InnerPartialOrder::new(partial_order_store);
42        PartialOrder {
43            operation_store,
44            inner: inner_dependency_checker,
45            _phantom: PhantomData,
46        }
47    }
48
49    /// Process a single operation.
50    pub async fn process(
51        &mut self,
52        operation: Operation<E>,
53    ) -> Result<(), OperationDependencyCheckerError> {
54        let hash = operation.hash;
55        let previous = operation.header.previous.clone();
56        self.inner.process(hash, &previous).await?;
57        Ok(())
58    }
59
60    /// Take the next ready operation from the queue.
61    pub async fn next(&mut self) -> Result<Option<Operation<E>>, OperationDependencyCheckerError> {
62        let Some(hash) = self.inner.next().await? else {
63            return Ok(None);
64        };
65
66        if let Some((header, body)) = self
67            .operation_store
68            .get_operation(hash)
69            .await
70            .map_err(|err| OperationDependencyCheckerError::StoreError(err.to_string()))?
71        {
72            let operation = Operation { hash, header, body };
73            Ok(Some(operation))
74        } else {
75            Err(OperationDependencyCheckerError::MissingOperation(hash))
76        }
77    }
78}
79
80#[derive(Debug, Error)]
81pub enum OperationDependencyCheckerError {
82    #[error(transparent)]
83    CheckerError(#[from] PartialOrderError),
84
85    #[error("store error: {0}")]
86    StoreError(String),
87
88    #[error("processed operation not found in store: {0}")]
89    MissingOperation(Hash),
90}
91
92#[cfg(test)]
93mod tests {
94    use std::collections::HashSet;
95
96    use p2panda_core::{Header, Operation, PrivateKey};
97    use p2panda_store::{MemoryStore, OperationStore};
98
99    use crate::ordering::partial::MemoryStore as PartialOrderMemoryStore;
100
101    use super::PartialOrder;
102
103    /// Create operations which form the following graph with their previous links:
104    ///
105    /// 0P0 <-- 0P1 <--\
106    ///     \-- OP2 <-- OP4
107    ///      \--OP3 <--/
108    ///
109    /// Each operation is inserted into the store in it's own log.
110    async fn setup(
111        private_key: &PrivateKey,
112        operation_store: &mut MemoryStore<u64>,
113    ) -> Vec<Operation> {
114        let mut header_0 = Header {
115            public_key: private_key.public_key(),
116            timestamp: 0,
117            ..Default::default()
118        };
119        header_0.sign(private_key);
120        let operation_0 = Operation {
121            hash: header_0.hash(),
122            header: header_0.clone(),
123            body: None,
124        };
125
126        let mut header_1 = Header {
127            public_key: private_key.public_key(),
128            previous: vec![header_0.hash()],
129            timestamp: 1,
130            ..Default::default()
131        };
132        header_1.sign(private_key);
133        let operation_1 = Operation {
134            hash: header_1.hash(),
135            header: header_1.clone(),
136            body: None,
137        };
138
139        let mut header_2 = Header {
140            public_key: private_key.public_key(),
141            previous: vec![header_0.hash()],
142            timestamp: 2,
143            ..Default::default()
144        };
145        header_2.sign(private_key);
146        let operation_2 = Operation {
147            hash: header_2.hash(),
148            header: header_2.clone(),
149            body: None,
150        };
151
152        let mut header_3 = Header {
153            public_key: private_key.public_key(),
154            previous: vec![header_0.hash()],
155            timestamp: 3,
156            ..Default::default()
157        };
158        header_3.sign(private_key);
159        let operation_3 = Operation {
160            hash: header_3.hash(),
161            header: header_3.clone(),
162            body: None,
163        };
164
165        let mut header_4 = Header {
166            public_key: private_key.public_key(),
167            previous: vec![header_1.hash(), header_2.hash(), header_3.hash()],
168            timestamp: 4,
169            ..Default::default()
170        };
171        header_4.sign(private_key);
172        let operation_4 = Operation {
173            hash: header_4.hash(),
174            header: header_4.clone(),
175            body: None,
176        };
177
178        operation_store
179            .insert_operation(header_0.hash(), &header_0, None, &header_0.to_bytes(), &0)
180            .await
181            .unwrap();
182
183        operation_store
184            .insert_operation(header_1.hash(), &header_1, None, &header_1.to_bytes(), &1)
185            .await
186            .unwrap();
187
188        operation_store
189            .insert_operation(header_2.hash(), &header_2, None, &header_2.to_bytes(), &2)
190            .await
191            .unwrap();
192
193        operation_store
194            .insert_operation(header_3.hash(), &header_3, None, &header_3.to_bytes(), &3)
195            .await
196            .unwrap();
197
198        operation_store
199            .insert_operation(header_4.hash(), &header_4, None, &header_4.to_bytes(), &4)
200            .await
201            .unwrap();
202
203        vec![
204            operation_0,
205            operation_1,
206            operation_2,
207            operation_3,
208            operation_4,
209        ]
210    }
211
212    #[tokio::test]
213    async fn operations_with_previous() {
214        let private_key = PrivateKey::new();
215        let mut operation_store = MemoryStore::<u64>::default();
216        let partial_order_store = PartialOrderMemoryStore::default();
217        let mut dependency_checker =
218            PartialOrder::new(operation_store.clone(), partial_order_store);
219
220        // Setup test data in the store. They form a dependency graph with the following form:
221        //
222        // 0P0 <-- 0P1 <--\
223        //     \-- OP2 <-- OP4
224        //      \--OP3 <--/
225        //
226        let operations = setup(&private_key, &mut operation_store).await;
227
228        // Process each operation out-of-order.
229        let result = dependency_checker.process(operations[4].clone()).await;
230        assert!(result.is_ok());
231
232        let result = dependency_checker.process(operations[3].clone()).await;
233        assert!(result.is_ok());
234
235        let result = dependency_checker.process(operations[2].clone()).await;
236        assert!(result.is_ok());
237
238        let result = dependency_checker.process(operations[1].clone()).await;
239        assert!(result.is_ok());
240
241        let result = dependency_checker.process(operations[0].clone()).await;
242        assert!(result.is_ok());
243
244        // Calling next should give us the first operation which had all it's dependencies met, in
245        // this case it's the root of the graph OP1.
246        let next = dependency_checker.next().await.unwrap();
247        assert_eq!(next.unwrap(), operations[0]);
248
249        // Operations OP1, OP2 and OP3 all only depend on operation OP0 and they were created
250        // concurrently, we should see all of these operations next, but we don't know in what order.
251        let mut concurrent_operations =
252            HashSet::from([operations[1].hash, operations[2].hash, operations[3].hash]);
253
254        let next = dependency_checker.next().await.unwrap();
255        assert!(next.is_some());
256        let next = next.unwrap();
257        assert!(concurrent_operations.remove(&next.hash),);
258
259        let next = dependency_checker.next().await.unwrap();
260        assert!(next.is_some());
261        let next = next.unwrap();
262        assert!(concurrent_operations.remove(&next.hash),);
263
264        let next = dependency_checker.next().await.unwrap();
265        assert!(next.is_some());
266        let next = next.unwrap();
267        assert!(concurrent_operations.remove(&next.hash),);
268
269        // We know OP4 will be given last as it depended on OP1, OP2 and OP3.
270        let next = dependency_checker.next().await.unwrap();
271        assert_eq!(next.unwrap(), operations[4]);
272    }
273
274    #[tokio::test]
275    async fn missing_dependency() {
276        let private_key = PrivateKey::new();
277        let mut operation_store = MemoryStore::<u64>::default();
278        let partial_order_store = PartialOrderMemoryStore::default();
279        let mut dependency_checker =
280            PartialOrder::new(operation_store.clone(), partial_order_store);
281
282        let operations = setup(&private_key, &mut operation_store).await;
283
284        let result = dependency_checker.process(operations[1].clone()).await;
285        assert!(result.is_ok());
286        let result = dependency_checker.process(operations[2].clone()).await;
287        assert!(result.is_ok());
288        let result = dependency_checker.process(operations[3].clone()).await;
289        assert!(result.is_ok());
290        let result = dependency_checker.process(operations[4].clone()).await;
291        assert!(result.is_ok());
292
293        let next = dependency_checker.next().await.unwrap();
294        assert!(next.is_none());
295    }
296}