Skip to main content

crdt_graph/
graph.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3
4use crate::TwoPTwoPGraphError;
5
6/// Common trait for all operation types, providing a unique identifier.
7pub trait TwoPTwoPId<Id> {
8    /// Returns the unique identifier of this operation.
9    fn id(&self) -> &Id;
10}
11
12/// Marker trait for a vertex-add operation.
13pub trait TwoPTwoPAddVertex<Id>: TwoPTwoPId<Id> {}
14
15/// Trait for a vertex-remove operation, linking back to the original add.
16pub trait TwoPTwoPRemoveVertex<Id>: TwoPTwoPId<Id> {
17    /// Returns the ID of the corresponding `addVertex` operation.
18    fn add_vertex_id(&self) -> &Id;
19}
20
21/// Trait for an edge-add operation, specifying source and target vertices.
22pub trait TwoPTwoPAddEdge<Id>: TwoPTwoPId<Id> {
23    /// Returns the source vertex ID.
24    fn source(&self) -> &Id;
25    /// Returns the target vertex ID.
26    fn target(&self) -> &Id;
27}
28
29/// Trait for an edge-remove operation, linking back to the original add.
30pub trait TwoPTwoPRemoveEdge<Id>: TwoPTwoPId<Id> {
31    /// Returns the ID of the corresponding `addEdge` operation.
32    fn add_edge_id(&self) -> &Id;
33}
34
35/// Distinguishes the two phases of an op-based CRDT update.
36pub enum UpdateType {
37    /// Executed only on the originating replica; checks preconditions.
38    AtSource,
39    /// Executed on all replicas; applies the actual state change.
40    Downstream,
41}
42
43/// An update operation that can be applied to a [`TwoPTwoPGraph`].
44#[derive(Clone, Debug, PartialEq, Eq)]
45pub enum UpdateOperation<VA, VR, EA, ER> {
46    AddVertex(VA),
47    RemoveVertex(VR),
48    AddEdge(EA),
49    RemoveEdge(ER),
50}
51
52/// An op-based 2P2P-Graph CRDT.
53///
54/// Maintains four sets corresponding to the paper's payload:
55/// - `V_A` — vertices added
56/// - `V_R` — vertices removed
57/// - `E_A` — edges added
58/// - `E_R` — edges removed
59///
60/// Generic parameters:
61/// - `VA` / `VR` — vertex add / remove operation types
62/// - `EA` / `ER` — edge add / remove operation types
63/// - `I` — the shared identifier type
64#[derive(Clone, Debug)]
65pub struct TwoPTwoPGraph<VA, VR, EA, ER, I>
66where
67    VA: Clone + TwoPTwoPAddVertex<I>,
68    VR: Clone + TwoPTwoPRemoveVertex<I>,
69    EA: Clone + TwoPTwoPAddEdge<I>,
70    ER: Clone + TwoPTwoPRemoveEdge<I>,
71    I: Eq + Hash + Debug + Clone,
72{
73    vertices_added: Vec<VA>,
74    vertices_removed: Vec<VR>,
75    edges_added: Vec<EA>,
76    edges_removed: Vec<ER>,
77    _phantom: std::marker::PhantomData<I>,
78}
79
80impl<VA, VR, EA, ER, I> Default for TwoPTwoPGraph<VA, VR, EA, ER, I>
81where
82    VA: Clone + TwoPTwoPAddVertex<I>,
83    VR: Clone + TwoPTwoPRemoveVertex<I>,
84    EA: Clone + TwoPTwoPAddEdge<I>,
85    ER: Clone + TwoPTwoPRemoveEdge<I>,
86    I: Eq + Hash + Debug + Clone,
87{
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl<VA, VR, EA, ER, I> TwoPTwoPGraph<VA, VR, EA, ER, I>
94where
95    VA: Clone + TwoPTwoPAddVertex<I>,
96    VR: Clone + TwoPTwoPRemoveVertex<I>,
97    EA: Clone + TwoPTwoPAddEdge<I>,
98    ER: Clone + TwoPTwoPRemoveEdge<I>,
99    I: Eq + Hash + Debug + Clone,
100{
101    /// Creates an empty graph with all four sets initialized to ∅.
102    pub fn new() -> Self {
103        TwoPTwoPGraph {
104            vertices_added: Vec::new(),
105            vertices_removed: Vec::new(),
106            edges_added: Vec::new(),
107            edges_removed: Vec::new(),
108            _phantom: std::marker::PhantomData,
109        }
110    }
111
112    /// Returns `true` if the vertex is in `V_A \ V_R` (added and not removed).
113    pub fn lookup_vertex(&self, vertex_id: &I) -> bool {
114        self.vertices_added.iter().any(|va| va.id() == vertex_id)
115            && !self
116                .vertices_removed
117                .iter()
118                .any(|vr| vr.add_vertex_id() == vertex_id)
119    }
120
121    /// Returns the edge-add operation referenced by a given edge-remove operation, if present.
122    pub fn get_edge_added_from_remove_edge(&self, remove_edge: &ER) -> Option<&EA> {
123        self.edges_added
124            .iter()
125            .find(|ea| ea.id() == remove_edge.add_edge_id())
126    }
127
128    /// Returns `true` if the edge referenced by `remove_edge` exists in `E_A \ E_R`
129    /// and both of its endpoint vertices are currently in `V_A \ V_R`.
130    pub fn lookup_from_remove_edge(&self, remove_edge: &ER) -> bool {
131        self.get_edge_added_from_remove_edge(remove_edge)
132            .is_some_and(|edge_added| {
133                self.lookup_vertex(edge_added.source())
134                    && self.lookup_vertex(edge_added.target())
135                    && !self
136                        .edges_removed
137                        .iter()
138                        .any(|er| er.add_edge_id() == remove_edge.add_edge_id())
139            })
140    }
141
142    /// Convenience method that calls [`prepare`](Self::prepare) and discards the returned operation.
143    pub fn update_operation(
144        &mut self,
145        update_operation: UpdateOperation<VA, VR, EA, ER>,
146    ) -> Result<(), TwoPTwoPGraphError<I>> {
147        self.prepare(update_operation).map(|_| ())
148    }
149
150    /// Executes atSource precondition checks and applies the downstream effect locally.
151    /// Returns the operation to broadcast to other replicas.
152    pub fn prepare(
153        &mut self,
154        op: UpdateOperation<VA, VR, EA, ER>,
155    ) -> Result<UpdateOperation<VA, VR, EA, ER>, TwoPTwoPGraphError<I>> {
156        let broadcast = op.clone();
157        match op {
158            UpdateOperation::AddVertex(vertex) => self.add_vertex(vertex, UpdateType::AtSource)?,
159            UpdateOperation::AddEdge(edge) => self.add_edge(edge, UpdateType::AtSource)?,
160            UpdateOperation::RemoveVertex(vertex) => {
161                self.remove_vertex(vertex, UpdateType::AtSource)?
162            }
163            UpdateOperation::RemoveEdge(edge) => self.remove_edge(edge, UpdateType::AtSource)?,
164        }
165        Ok(broadcast)
166    }
167
168    /// Applies an operation received from a remote replica (downstream).
169    pub fn apply_downstream(
170        &mut self,
171        op: UpdateOperation<VA, VR, EA, ER>,
172    ) -> Result<(), TwoPTwoPGraphError<I>> {
173        match op {
174            UpdateOperation::AddVertex(vertex) => self.add_vertex(vertex, UpdateType::Downstream),
175            UpdateOperation::AddEdge(edge) => self.add_edge(edge, UpdateType::Downstream),
176            UpdateOperation::RemoveVertex(vertex) => {
177                self.remove_vertex(vertex, UpdateType::Downstream)
178            }
179            UpdateOperation::RemoveEdge(edge) => self.remove_edge(edge, UpdateType::Downstream),
180        }
181    }
182
183    /// Adds a vertex to `V_A`. Fails if a vertex with the same ID already exists.
184    ///
185    /// Both `AtSource` and `Downstream` behave identically (no preconditions per the paper).
186    pub fn add_vertex(
187        &mut self,
188        vertex: VA,
189        _update_type: UpdateType,
190    ) -> Result<(), TwoPTwoPGraphError<I>> {
191        if self.vertices_added.iter().any(|va| va.id() == vertex.id()) {
192            return Err(TwoPTwoPGraphError::VertexAlreadyExists(vertex.id().clone()));
193        }
194        self.vertices_added.push(vertex);
195        Ok(())
196    }
197
198    /// Adds an edge to `E_A`.
199    ///
200    /// - **AtSource**: checks `lookup(source) ∧ lookup(target)`.
201    /// - **Downstream**: skips vertex existence checks (per the paper).
202    pub fn add_edge(
203        &mut self,
204        edge: EA,
205        update_type: UpdateType,
206    ) -> Result<(), TwoPTwoPGraphError<I>> {
207        if matches!(update_type, UpdateType::AtSource) {
208            if !self.lookup_vertex(&edge.source()) {
209                return Err(TwoPTwoPGraphError::VertexDoesNotExists(
210                    edge.source().clone(),
211                ));
212            }
213            if !self.lookup_vertex(&edge.target()) {
214                return Err(TwoPTwoPGraphError::VertexDoesNotExists(
215                    edge.target().clone(),
216                ));
217            }
218        }
219        if self.edges_added.iter().any(|ea| ea.id() == edge.id()) {
220            return Err(TwoPTwoPGraphError::EdgeAlreadyExists(edge.id().clone()));
221        }
222        self.edges_added.push(edge);
223        Ok(())
224    }
225
226    /// Adds a vertex-remove to `V_R`.
227    ///
228    /// - **AtSource**: checks `lookup(w)` and that no active edge references `w`.
229    /// - **Downstream**: checks that the corresponding `addVertex(w)` has been delivered.
230    pub fn remove_vertex(
231        &mut self,
232        vertex: VR,
233        update_type: UpdateType,
234    ) -> Result<(), TwoPTwoPGraphError<I>> {
235        if matches!(update_type, UpdateType::AtSource) {
236            // pre
237            if !self.lookup_vertex(vertex.add_vertex_id()) {
238                return Err(TwoPTwoPGraphError::VertexDoesNotExists(
239                    vertex.add_vertex_id().clone(),
240                ));
241            }
242            // pre: E ⊆ V × V — vertex has no active edges
243            for ea in self.edges_added.iter() {
244                let is_removed = self
245                    .edges_removed
246                    .iter()
247                    .any(|er| ea.id() == er.add_edge_id());
248                if !is_removed
249                    && (ea.source() == vertex.add_vertex_id()
250                        || ea.target() == vertex.add_vertex_id())
251                {
252                    return Err(TwoPTwoPGraphError::VertexHasEdge(
253                        vertex.add_vertex_id().clone(),
254                        ea.id().clone(),
255                    ));
256                }
257            }
258        }
259
260        if matches!(update_type, UpdateType::Downstream) {
261            // pre: addVertex(w) delivered
262            if !self
263                .vertices_added
264                .iter()
265                .any(|va| va.id() == vertex.add_vertex_id())
266            {
267                return Err(TwoPTwoPGraphError::AddVertexNotDelivered(
268                    vertex.add_vertex_id().clone(),
269                ));
270            }
271        }
272
273        if self
274            .vertices_removed
275            .iter()
276            .any(|vr| vr.id() == vertex.id())
277        {
278            return Err(TwoPTwoPGraphError::VertexAlreadyExists(vertex.id().clone()));
279        }
280        self.vertices_removed.push(vertex);
281        Ok(())
282    }
283
284    /// Adds an edge-remove to `E_R`.
285    ///
286    /// - **AtSource**: checks `lookup((u,v))`.
287    /// - **Downstream**: checks that the corresponding `addEdge(u,v)` has been delivered.
288    pub fn remove_edge(
289        &mut self,
290        remove_edge: ER,
291        update_type: UpdateType,
292    ) -> Result<(), TwoPTwoPGraphError<I>> {
293        if matches!(update_type, UpdateType::AtSource) {
294            // pre: lookup((u,v))
295            if !self.lookup_from_remove_edge(&remove_edge) {
296                return Err(TwoPTwoPGraphError::EdgeDoesNotExists(
297                    remove_edge.add_edge_id().clone(),
298                ));
299            }
300        }
301
302        if matches!(update_type, UpdateType::Downstream) {
303            // pre: addEdge(u,v) delivered
304            if !self
305                .edges_added
306                .iter()
307                .any(|ea| ea.id() == remove_edge.add_edge_id())
308            {
309                return Err(TwoPTwoPGraphError::AddEdgeNotDelivered(
310                    remove_edge.add_edge_id().clone(),
311                ));
312            }
313        }
314
315        if self
316            .edges_removed
317            .iter()
318            .any(|er| er.id() == remove_edge.id())
319        {
320            return Err(TwoPTwoPGraphError::EdgeAlreadyExists(
321                remove_edge.id().clone(),
322            ));
323        }
324        self.edges_removed.push(remove_edge);
325        Ok(())
326    }
327
328    /// Converts the current CRDT state into a [`petgraph::graph::DiGraph`].
329    ///
330    /// Only vertices in `V_A \ V_R` and edges in `E_A \ E_R` (whose endpoints
331    /// are present) are included in the resulting directed graph.
332    pub fn generate_petgraph(&self) -> petgraph::graph::DiGraph<VA, EA> {
333        let mut graph = petgraph::graph::DiGraph::new();
334        let mut vertex_map = std::collections::HashMap::new();
335        for va in self.vertices_added.iter() {
336            let is_removed = self
337                .vertices_removed
338                .iter()
339                .any(|vr| va.id() == vr.add_vertex_id());
340            if !is_removed {
341                let vertex = graph.add_node(va.clone());
342                vertex_map.insert(va.id().clone(), vertex);
343            }
344        }
345        for ea in self.edges_added.iter() {
346            let is_removed = self
347                .edges_removed
348                .iter()
349                .any(|er| ea.id() == er.add_edge_id());
350            if !is_removed {
351                if let (Some(&source), Some(&target)) =
352                    (vertex_map.get(ea.source()), vertex_map.get(ea.target()))
353                {
354                    graph.add_edge(source, target, ea.clone());
355                }
356            }
357        }
358        graph
359    }
360
361    /// Returns the number of active vertices (`V_A \ V_R`).
362    pub fn vertex_count(&self) -> usize {
363        self.vertices_added
364            .iter()
365            .filter(|va| {
366                !self
367                    .vertices_removed
368                    .iter()
369                    .any(|vr| va.id() == vr.add_vertex_id())
370            })
371            .count()
372    }
373
374    /// Returns the number of active edges (`E_A \ E_R`).
375    pub fn edge_count(&self) -> usize {
376        self.edges_added
377            .iter()
378            .filter(|ea| {
379                !self
380                    .edges_removed
381                    .iter()
382                    .any(|er| ea.id() == er.add_edge_id())
383            })
384            .count()
385    }
386
387    /// Returns `true` if the graph has no active vertices and no active edges.
388    pub fn is_empty(&self) -> bool {
389        self.vertex_count() == 0 && self.edge_count() == 0
390    }
391
392    /// Returns an iterator over all active (non-removed) vertex-add operations.
393    pub fn vertices(&self) -> impl Iterator<Item = &VA> {
394        self.vertices_added.iter().filter(|va| {
395            !self
396                .vertices_removed
397                .iter()
398                .any(|vr| va.id() == vr.add_vertex_id())
399        })
400    }
401
402    /// Returns an iterator over all active (non-removed) edge-add operations.
403    pub fn edges(&self) -> impl Iterator<Item = &EA> {
404        self.edges_added.iter().filter(|ea| {
405            !self
406                .edges_removed
407                .iter()
408                .any(|er| ea.id() == er.add_edge_id())
409        })
410    }
411}