graphannis_core/graph/
update.rs

1//! Types used to describe updates on graphs.
2
3use std::convert::TryInto;
4use std::fs::File;
5use std::sync::Mutex;
6
7use crate::errors::{GraphAnnisCoreError, Result};
8use crate::serializer::KeySerializer;
9use bincode::Options;
10use facet::Facet;
11use serde::de::Error as DeserializeError;
12use serde::de::{MapAccess, Visitor};
13use serde::ser::{Error as SerializeError, SerializeMap};
14use serde::{Deserialize, Deserializer, Serialize, Serializer};
15use sstable::{SSIterator, Table, TableBuilder, TableIterator};
16use tempfile::NamedTempFile;
17
18/// Describes a single update on the graph.
19#[derive(Serialize, Deserialize, Clone, Debug, Facet, PartialEq)]
20#[repr(u8)]
21pub enum UpdateEvent {
22    /// Add a node with a name and type.
23    AddNode {
24        node_name: String,
25        node_type: String,
26    },
27    /// Delete a node given by the name.
28    DeleteNode { node_name: String },
29    /// Add a label to a the node given by the name.
30    AddNodeLabel {
31        node_name: String,
32        anno_ns: String,
33        anno_name: String,
34        anno_value: String,
35    },
36    /// Delete a label of an node given by the name of the node and the qualified label name.
37    DeleteNodeLabel {
38        node_name: String,
39        anno_ns: String,
40        anno_name: String,
41    },
42    /// Add an edge between two nodes given by their name.
43    AddEdge {
44        source_node: String,
45        target_node: String,
46        layer: String,
47        component_type: String,
48        component_name: String,
49    },
50    /// Delete an existing edge between two nodes given by their name.
51    DeleteEdge {
52        source_node: String,
53        target_node: String,
54        layer: String,
55        component_type: String,
56        component_name: String,
57    },
58    /// Add a label to an edge between two nodes.
59    AddEdgeLabel {
60        source_node: String,
61        target_node: String,
62        layer: String,
63        component_type: String,
64        component_name: String,
65        anno_ns: String,
66        anno_name: String,
67        anno_value: String,
68    },
69    /// Delete a label from an edge between two nodes.
70    DeleteEdgeLabel {
71        source_node: String,
72        target_node: String,
73        layer: String,
74        component_type: String,
75        component_name: String,
76        anno_ns: String,
77        anno_name: String,
78    },
79}
80
81enum ChangeSet {
82    InProgress {
83        table_builder: Box<TableBuilder<File>>,
84        outfile: NamedTempFile,
85    },
86    Finished {
87        table: Table,
88    },
89}
90
91/// A list of changes to apply to an graph.
92pub struct GraphUpdate {
93    changesets: Mutex<Vec<ChangeSet>>,
94    event_counter: u64,
95    serialization: bincode::config::DefaultOptions,
96}
97
98impl Default for GraphUpdate {
99    fn default() -> Self {
100        GraphUpdate::new()
101    }
102}
103
104impl GraphUpdate {
105    /// Create a new empty list of updates.
106    pub fn new() -> GraphUpdate {
107        GraphUpdate {
108            event_counter: 0,
109            changesets: Mutex::new(Vec::new()),
110            serialization: bincode::options(),
111        }
112    }
113
114    /// Add the given event to the update list.
115    pub fn add_event(&mut self, event: UpdateEvent) -> Result<()> {
116        let new_event_counter = self.event_counter + 1;
117        let key = new_event_counter.create_key();
118        let value = self.serialization.serialize(&event)?;
119        let mut changeset = self.changesets.lock()?;
120        if let ChangeSet::InProgress { table_builder, .. } =
121            current_inprogress_changeset(&mut changeset)?
122        {
123            table_builder.add(&key, &value)?;
124            self.event_counter = new_event_counter;
125        }
126        Ok(())
127    }
128
129    /// Get all changes
130    pub fn iter(&self) -> Result<GraphUpdateIterator> {
131        let it = GraphUpdateIterator::new(self)?;
132        Ok(it)
133    }
134
135    /// Returns `true` if the update list is empty.
136    pub fn is_empty(&self) -> Result<bool> {
137        Ok(self.event_counter == 0)
138    }
139
140    // Returns the number of updates.
141    pub fn len(&self) -> Result<usize> {
142        let result = self.event_counter.try_into()?;
143        Ok(result)
144    }
145}
146
147fn finish_all_changesets(changesets: &mut Vec<ChangeSet>) -> Result<()> {
148    // Remove all changesets from the vector and finish them
149    let finished: Result<Vec<ChangeSet>> = changesets
150        .drain(..)
151        .map(|c| match c {
152            ChangeSet::InProgress {
153                table_builder,
154                outfile,
155            } => {
156                table_builder.finish()?;
157                // Re-open as table
158                let file = outfile.reopen()?;
159                let size = file.metadata()?.len();
160                let table = Table::new(sstable::Options::default(), Box::new(file), size as usize)?;
161                Ok(ChangeSet::Finished { table })
162            }
163            ChangeSet::Finished { table } => Ok(ChangeSet::Finished { table }),
164        })
165        .collect();
166    // Re-add the finished changesets
167    changesets.extend(finished?);
168
169    Ok(())
170}
171
172fn current_inprogress_changeset(changesets: &mut Vec<ChangeSet>) -> Result<&mut ChangeSet> {
173    let needs_new_changeset = if let Some(c) = changesets.last_mut() {
174        match c {
175            ChangeSet::InProgress { .. } => false,
176            ChangeSet::Finished { .. } => true,
177        }
178    } else {
179        true
180    };
181
182    if needs_new_changeset {
183        // Create a new changeset
184        let outfile = NamedTempFile::new()?;
185        let table_builder = TableBuilder::new(sstable::Options::default(), outfile.reopen()?);
186        let c = ChangeSet::InProgress {
187            table_builder: Box::new(table_builder),
188            outfile,
189        };
190        changesets.push(c);
191    }
192
193    // Get the last changeset, which must be in the InProgress state
194    changesets
195        .last_mut()
196        .ok_or(GraphAnnisCoreError::GraphUpdatePersistanceFileMissing)
197}
198
199pub struct GraphUpdateIterator {
200    iterators: Vec<TableIterator>,
201    size_hint: u64,
202    serialization: bincode::config::DefaultOptions,
203}
204
205impl GraphUpdateIterator {
206    fn new(g: &GraphUpdate) -> Result<GraphUpdateIterator> {
207        let mut changesets = g.changesets.lock()?;
208
209        finish_all_changesets(&mut changesets)?;
210
211        let iterators: Vec<_> = changesets
212            .iter()
213            .filter_map(|c| match c {
214                ChangeSet::InProgress { .. } => None,
215                ChangeSet::Finished { table } => {
216                    let mut it = table.iter();
217                    it.seek_to_first();
218                    Some(it)
219                }
220            })
221            .collect();
222        Ok(GraphUpdateIterator {
223            size_hint: g.event_counter,
224            iterators,
225            serialization: g.serialization,
226        })
227    }
228}
229
230impl std::iter::Iterator for GraphUpdateIterator {
231    type Item = Result<(u64, UpdateEvent)>;
232
233    fn next(&mut self) -> Option<Self::Item> {
234        // Remove all empty table iterators.
235        self.iterators.retain(|it| it.valid());
236
237        if let Some(it) = self.iterators.first_mut() {
238            // Get the current values
239            if let Some((key, value)) = sstable::current_key_val(it) {
240                // Create the actual types
241                let id = match u64::parse_key(&key) {
242                    Ok(id) => id,
243                    Err(e) => return Some(Err(e.into())),
244                };
245                let event: UpdateEvent = match self.serialization.deserialize(&value) {
246                    Ok(event) => event,
247                    Err(e) => return Some(Err(e.into())),
248                };
249
250                // Advance for next iteration
251                it.advance();
252                return Some(Ok((id, event)));
253            }
254        }
255        None
256    }
257
258    fn size_hint(&self) -> (usize, Option<usize>) {
259        if let Ok(s) = self.size_hint.try_into() {
260            (s, Some(s))
261        } else {
262            (0, None)
263        }
264    }
265}
266
267impl Serialize for GraphUpdate {
268    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
269    where
270        S: Serializer,
271    {
272        let iter = self.iter().map_err(S::Error::custom)?;
273        let number_of_updates = self.len().map_err(S::Error::custom)?;
274        let mut map_serializer = serializer.serialize_map(Some(number_of_updates))?;
275
276        for entry in iter {
277            let (key, value) = entry.map_err(S::Error::custom)?;
278            map_serializer
279                .serialize_entry(&key, &value)
280                .map_err(S::Error::custom)?;
281        }
282
283        map_serializer.end()
284    }
285}
286
287struct GraphUpdateVisitor {}
288
289impl<'de> Visitor<'de> for GraphUpdateVisitor {
290    type Value = GraphUpdate;
291
292    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
293        formatter.write_str("a list of graph updates")
294    }
295
296    fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
297    where
298        M: MapAccess<'de>,
299    {
300        let serialization = bincode::options();
301        let outfile = NamedTempFile::new().map_err(M::Error::custom)?;
302        let mut table_builder = TableBuilder::new(
303            sstable::Options::default(),
304            outfile.reopen().map_err(M::Error::custom)?,
305        );
306
307        let mut event_counter = 0;
308
309        while let Some((id, event)) = access.next_entry::<u64, UpdateEvent>()? {
310            event_counter = id;
311            let key = id.create_key();
312            let value = serialization.serialize(&event).map_err(M::Error::custom)?;
313            table_builder.add(&key, &value).map_err(M::Error::custom)?
314        }
315
316        let c = ChangeSet::InProgress {
317            outfile,
318            table_builder: Box::new(table_builder),
319        };
320        let mut changesets = vec![c];
321        finish_all_changesets(&mut changesets).map_err(M::Error::custom)?;
322        let g = GraphUpdate {
323            changesets: Mutex::new(changesets),
324            event_counter,
325            serialization,
326        };
327
328        Ok(g)
329    }
330}
331
332impl<'de> Deserialize<'de> for GraphUpdate {
333    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
334    where
335        D: Deserializer<'de>,
336    {
337        deserializer.deserialize_map(GraphUpdateVisitor {})
338    }
339}
340
341#[cfg(test)]
342mod tests;