Skip to main content

crdt_graph/
graph.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3
4use crate::TwoPTwoPGraphError;
5
6/// Common trait for all operation types, providing a unique identifier.
7pub trait TwoPTwoPId<Id> {
8    /// Returns the unique identifier of this operation.
9    fn id(&self) -> &Id;
10}
11
12/// Marker trait for a vertex-add operation.
13pub trait TwoPTwoPAddVertex<Id>: TwoPTwoPId<Id> {}
14
15/// Trait for a vertex-remove operation, linking back to the original add.
16pub trait TwoPTwoPRemoveVertex<Id>: TwoPTwoPId<Id> {
17    /// Returns the ID of the corresponding `addVertex` operation.
18    fn add_vertex_id(&self) -> &Id;
19}
20
21/// Trait for an edge-add operation, specifying source and target vertices.
22pub trait TwoPTwoPAddEdge<Id>: TwoPTwoPId<Id> {
23    /// Returns the source vertex ID.
24    fn source(&self) -> &Id;
25    /// Returns the target vertex ID.
26    fn target(&self) -> &Id;
27}
28
29/// Trait for an edge-remove operation, linking back to the original add.
30pub trait TwoPTwoPRemoveEdge<Id>: TwoPTwoPId<Id> {
31    /// Returns the ID of the corresponding `addEdge` operation.
32    fn add_edge_id(&self) -> &Id;
33}
34
35/// Distinguishes the two phases of an op-based CRDT update.
36pub enum UpdateType {
37    /// Executed only on the originating replica; checks preconditions.
38    AtSource,
39    /// Executed on all replicas; applies the actual state change.
40    Downstream,
41}
42
43/// An update operation that can be applied to a [`TwoPTwoPGraph`].
44#[derive(Clone, Debug)]
45pub enum UpdateOperation<VA, VR, EA, ER> {
46    AddVertex(VA),
47    RemoveVertex(VR),
48    AddEdge(EA),
49    RemoveEdge(ER),
50}
51
52/// An op-based 2P2P-Graph CRDT.
53///
54/// Maintains four sets corresponding to the paper's payload:
55/// - `V_A` — vertices added
56/// - `V_R` — vertices removed
57/// - `E_A` — edges added
58/// - `E_R` — edges removed
59///
60/// Generic parameters:
61/// - `VA` / `VR` — vertex add / remove operation types
62/// - `EA` / `ER` — edge add / remove operation types
63/// - `I` — the shared identifier type
64#[derive(Clone, Debug)]
65pub struct TwoPTwoPGraph<VA, VR, EA, ER, I>
66where
67    VA: Clone + TwoPTwoPAddVertex<I>,
68    VR: Clone + TwoPTwoPRemoveVertex<I>,
69    EA: Clone + TwoPTwoPAddEdge<I>,
70    ER: Clone + TwoPTwoPRemoveEdge<I>,
71    I: Eq + Hash + Debug + Clone,
72{
73    vertices_added: Vec<VA>,
74    vertices_removed: Vec<VR>,
75    edges_added: Vec<EA>,
76    edges_removed: Vec<ER>,
77    _phantom: std::marker::PhantomData<I>,
78}
79
80impl<VA, VR, EA, ER, I> TwoPTwoPGraph<VA, VR, EA, ER, I>
81where
82    VA: Clone + TwoPTwoPAddVertex<I>,
83    VR: Clone + TwoPTwoPRemoveVertex<I>,
84    EA: Clone + TwoPTwoPAddEdge<I>,
85    ER: Clone + TwoPTwoPRemoveEdge<I>,
86    I: Eq + Hash + Debug + Clone,
87{
88    /// Creates an empty graph with all four sets initialized to ∅.
89    pub fn new() -> Self {
90        TwoPTwoPGraph {
91            vertices_added: Vec::new(),
92            vertices_removed: Vec::new(),
93            edges_added: Vec::new(),
94            edges_removed: Vec::new(),
95            _phantom: std::marker::PhantomData,
96        }
97    }
98
99    /// Returns `true` if the vertex is in `V_A \ V_R` (added and not removed).
100    pub fn lookup_vertex(&self, vertex_id: &I) -> bool {
101        self.vertices_added.iter().any(|va| va.id() == vertex_id)
102            && !self
103                .vertices_removed
104                .iter()
105                .any(|vr| vr.add_vertex_id() == vertex_id)
106    }
107
108    /// Returns the edge-add operation referenced by a given edge-remove operation, if present.
109    pub fn get_edge_added_from_remove_edge(&self, remove_edge: &ER) -> Option<&EA> {
110        self.edges_added
111            .iter()
112            .find(|ea| ea.id() == remove_edge.add_edge_id())
113    }
114
115    /// Returns `true` if the edge referenced by `remove_edge` exists in `E_A \ E_R`
116    /// and both of its endpoint vertices are currently in `V_A \ V_R`.
117    pub fn lookup_from_remove_edge(&self, remove_edge: &ER) -> bool {
118        self.get_edge_added_from_remove_edge(remove_edge)
119            .is_some_and(|edge_added| {
120                self.lookup_vertex(edge_added.source())
121                    && self.lookup_vertex(edge_added.target())
122                    && !self
123                        .edges_removed
124                        .iter()
125                        .any(|er| er.add_edge_id() == remove_edge.add_edge_id())
126            })
127    }
128
129    /// Convenience method that calls [`prepare`](Self::prepare) and discards the returned operation.
130    pub fn update_operation(
131        &mut self,
132        update_operation: UpdateOperation<VA, VR, EA, ER>,
133    ) -> Result<(), TwoPTwoPGraphError<I>> {
134        self.prepare(update_operation).map(|_| ())
135    }
136
137    /// Executes atSource precondition checks and applies the downstream effect locally.
138    /// Returns the operation to broadcast to other replicas.
139    pub fn prepare(
140        &mut self,
141        op: UpdateOperation<VA, VR, EA, ER>,
142    ) -> Result<UpdateOperation<VA, VR, EA, ER>, TwoPTwoPGraphError<I>> {
143        let broadcast = op.clone();
144        match op {
145            UpdateOperation::AddVertex(vertex) => self.add_vertex(vertex, UpdateType::AtSource)?,
146            UpdateOperation::AddEdge(edge) => self.add_edge(edge, UpdateType::AtSource)?,
147            UpdateOperation::RemoveVertex(vertex) => {
148                self.remove_vertex(vertex, UpdateType::AtSource)?
149            }
150            UpdateOperation::RemoveEdge(edge) => self.remove_edge(edge, UpdateType::AtSource)?,
151        }
152        Ok(broadcast)
153    }
154
155    /// Applies an operation received from a remote replica (downstream).
156    pub fn apply_downstream(
157        &mut self,
158        op: UpdateOperation<VA, VR, EA, ER>,
159    ) -> Result<(), TwoPTwoPGraphError<I>> {
160        match op {
161            UpdateOperation::AddVertex(vertex) => self.add_vertex(vertex, UpdateType::Downstream),
162            UpdateOperation::AddEdge(edge) => self.add_edge(edge, UpdateType::Downstream),
163            UpdateOperation::RemoveVertex(vertex) => {
164                self.remove_vertex(vertex, UpdateType::Downstream)
165            }
166            UpdateOperation::RemoveEdge(edge) => self.remove_edge(edge, UpdateType::Downstream),
167        }
168    }
169
170    /// Adds a vertex to `V_A`. Fails if a vertex with the same ID already exists.
171    ///
172    /// Both `AtSource` and `Downstream` behave identically (no preconditions per the paper).
173    pub fn add_vertex(
174        &mut self,
175        vertex: VA,
176        _update_type: UpdateType,
177    ) -> Result<(), TwoPTwoPGraphError<I>> {
178        if self.vertices_added.iter().any(|va| va.id() == vertex.id()) {
179            return Err(TwoPTwoPGraphError::VertexAlreadyExists(vertex.id().clone()));
180        }
181        self.vertices_added.push(vertex);
182        Ok(())
183    }
184
185    /// Adds an edge to `E_A`.
186    ///
187    /// - **AtSource**: checks `lookup(source) ∧ lookup(target)`.
188    /// - **Downstream**: skips vertex existence checks (per the paper).
189    pub fn add_edge(
190        &mut self,
191        edge: EA,
192        update_type: UpdateType,
193    ) -> Result<(), TwoPTwoPGraphError<I>> {
194        if matches!(update_type, UpdateType::AtSource) {
195            if !self.lookup_vertex(&edge.source()) {
196                return Err(TwoPTwoPGraphError::VertexDoesNotExists(
197                    edge.source().clone(),
198                ));
199            }
200            if !self.lookup_vertex(&edge.target()) {
201                return Err(TwoPTwoPGraphError::VertexDoesNotExists(
202                    edge.target().clone(),
203                ));
204            }
205        }
206        if self.edges_added.iter().any(|ea| ea.id() == edge.id()) {
207            return Err(TwoPTwoPGraphError::EdgeAlreadyExists(edge.id().clone()));
208        }
209        self.edges_added.push(edge);
210        Ok(())
211    }
212
213    /// Adds a vertex-remove to `V_R`.
214    ///
215    /// - **AtSource**: checks `lookup(w)` and that no active edge references `w`.
216    /// - **Downstream**: checks that the corresponding `addVertex(w)` has been delivered.
217    pub fn remove_vertex(
218        &mut self,
219        vertex: VR,
220        update_type: UpdateType,
221    ) -> Result<(), TwoPTwoPGraphError<I>> {
222        if matches!(update_type, UpdateType::AtSource) {
223            // pre
224            if !self.lookup_vertex(vertex.add_vertex_id()) {
225                return Err(TwoPTwoPGraphError::VertexDoesNotExists(
226                    vertex.add_vertex_id().clone(),
227                ));
228            }
229            // pre: E ⊆ V × V — vertex has no active edges
230            for ea in self.edges_added.iter() {
231                let is_removed = self
232                    .edges_removed
233                    .iter()
234                    .any(|er| ea.id() == er.add_edge_id());
235                if !is_removed
236                    && (ea.source() == vertex.add_vertex_id()
237                        || ea.target() == vertex.add_vertex_id())
238                {
239                    return Err(TwoPTwoPGraphError::VertexHasEdge(
240                        vertex.add_vertex_id().clone(),
241                        ea.id().clone(),
242                    ));
243                }
244            }
245        }
246
247        if matches!(update_type, UpdateType::Downstream) {
248            // pre: addVertex(w) delivered
249            if !self
250                .vertices_added
251                .iter()
252                .any(|va| va.id() == vertex.add_vertex_id())
253            {
254                return Err(TwoPTwoPGraphError::AddVertexNotDelivered(
255                    vertex.add_vertex_id().clone(),
256                ));
257            }
258        }
259
260        if self
261            .vertices_removed
262            .iter()
263            .any(|vr| vr.id() == vertex.id())
264        {
265            return Err(TwoPTwoPGraphError::VertexAlreadyExists(vertex.id().clone()));
266        }
267        self.vertices_removed.push(vertex);
268        Ok(())
269    }
270
271    /// Adds an edge-remove to `E_R`.
272    ///
273    /// - **AtSource**: checks `lookup((u,v))`.
274    /// - **Downstream**: checks that the corresponding `addEdge(u,v)` has been delivered.
275    pub fn remove_edge(
276        &mut self,
277        remove_edge: ER,
278        update_type: UpdateType,
279    ) -> Result<(), TwoPTwoPGraphError<I>> {
280        if matches!(update_type, UpdateType::AtSource) {
281            // pre: lookup((u,v))
282            if !self.lookup_from_remove_edge(&remove_edge) {
283                return Err(TwoPTwoPGraphError::EdgeDoesNotExists(
284                    remove_edge.add_edge_id().clone(),
285                ));
286            }
287        }
288
289        if matches!(update_type, UpdateType::Downstream) {
290            // pre: addEdge(u,v) delivered
291            if !self
292                .edges_added
293                .iter()
294                .any(|ea| ea.id() == remove_edge.add_edge_id())
295            {
296                return Err(TwoPTwoPGraphError::AddEdgeNotDelivered(
297                    remove_edge.add_edge_id().clone(),
298                ));
299            }
300        }
301
302        if self
303            .edges_removed
304            .iter()
305            .any(|er| er.id() == remove_edge.id())
306        {
307            return Err(TwoPTwoPGraphError::EdgeAlreadyExists(
308                remove_edge.id().clone(),
309            ));
310        }
311        self.edges_removed.push(remove_edge);
312        Ok(())
313    }
314
315    /// Converts the current CRDT state into a [`petgraph::graph::DiGraph`].
316    ///
317    /// Only vertices in `V_A \ V_R` and edges in `E_A \ E_R` (whose endpoints
318    /// are present) are included in the resulting directed graph.
319    pub fn generate_petgraph(&self) -> petgraph::graph::DiGraph<VA, EA> {
320        let mut graph = petgraph::graph::DiGraph::new();
321        let mut vertex_map = std::collections::HashMap::new();
322        for va in self.vertices_added.iter() {
323            let is_removed = self
324                .vertices_removed
325                .iter()
326                .any(|vr| va.id() == vr.add_vertex_id());
327            if !is_removed {
328                let vertex = graph.add_node(va.clone());
329                vertex_map.insert(va.id().clone(), vertex);
330            }
331        }
332        for ea in self.edges_added.iter() {
333            let is_removed = self
334                .edges_removed
335                .iter()
336                .any(|er| ea.id() == er.add_edge_id());
337            if !is_removed {
338                if let (Some(&source), Some(&target)) =
339                    (vertex_map.get(ea.source()), vertex_map.get(ea.target()))
340                {
341                    graph.add_edge(source, target, ea.clone());
342                }
343            }
344        }
345        graph
346    }
347}