graphannis_core/graph/
update.rs

1//! Types used to describe updates on graphs.
2
3use std::convert::TryInto;
4use std::fs::File;
5use std::sync::Mutex;
6
7use crate::errors::{GraphAnnisCoreError, Result};
8use crate::serializer::KeySerializer;
9use bincode::Options;
10use serde::de::Error as DeserializeError;
11use serde::de::{MapAccess, Visitor};
12use serde::ser::{Error as SerializeError, SerializeMap};
13use serde::{Deserialize, Deserializer, Serialize, Serializer};
14use sstable::{SSIterator, Table, TableBuilder, TableIterator};
15use tempfile::NamedTempFile;
16
17/// Describes a single update on the graph.
18#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
19pub enum UpdateEvent {
20    /// Add a node with a name and type.
21    AddNode {
22        node_name: String,
23        node_type: String,
24    },
25    /// Delete a node given by the name.
26    DeleteNode { node_name: String },
27    /// Add a label to a the node given by the name.
28    AddNodeLabel {
29        node_name: String,
30        anno_ns: String,
31        anno_name: String,
32        anno_value: String,
33    },
34    /// Delete a label of an node given by the name of the node and the qualified label name.
35    DeleteNodeLabel {
36        node_name: String,
37        anno_ns: String,
38        anno_name: String,
39    },
40    /// Add an edge between two nodes given by their name.
41    AddEdge {
42        source_node: String,
43        target_node: String,
44        layer: String,
45        component_type: String,
46        component_name: String,
47    },
48    /// Delete an existing edge between two nodes given by their name.
49    DeleteEdge {
50        source_node: String,
51        target_node: String,
52        layer: String,
53        component_type: String,
54        component_name: String,
55    },
56    /// Add a label to an edge between two nodes.
57    AddEdgeLabel {
58        source_node: String,
59        target_node: String,
60        layer: String,
61        component_type: String,
62        component_name: String,
63        anno_ns: String,
64        anno_name: String,
65        anno_value: String,
66    },
67    /// Delete a label from an edge between two nodes.
68    DeleteEdgeLabel {
69        source_node: String,
70        target_node: String,
71        layer: String,
72        component_type: String,
73        component_name: String,
74        anno_ns: String,
75        anno_name: String,
76    },
77}
78
79enum ChangeSet {
80    InProgress {
81        table_builder: Box<TableBuilder<File>>,
82        outfile: NamedTempFile,
83    },
84    Finished {
85        table: Table,
86    },
87}
88
89/// A list of changes to apply to an graph.
90pub struct GraphUpdate {
91    changesets: Mutex<Vec<ChangeSet>>,
92    event_counter: u64,
93    serialization: bincode::config::DefaultOptions,
94}
95
96impl Default for GraphUpdate {
97    fn default() -> Self {
98        GraphUpdate::new()
99    }
100}
101
102impl GraphUpdate {
103    /// Create a new empty list of updates.
104    pub fn new() -> GraphUpdate {
105        GraphUpdate {
106            event_counter: 0,
107            changesets: Mutex::new(Vec::new()),
108            serialization: bincode::options(),
109        }
110    }
111
112    /// Add the given event to the update list.
113    pub fn add_event(&mut self, event: UpdateEvent) -> Result<()> {
114        let new_event_counter = self.event_counter + 1;
115        let key = new_event_counter.create_key();
116        let value = self.serialization.serialize(&event)?;
117        let mut changeset = self.changesets.lock()?;
118        if let ChangeSet::InProgress { table_builder, .. } =
119            current_inprogress_changeset(&mut changeset)?
120        {
121            table_builder.add(&key, &value)?;
122            self.event_counter = new_event_counter;
123        }
124        Ok(())
125    }
126
127    /// Get all changes
128    pub fn iter(&self) -> Result<GraphUpdateIterator> {
129        let it = GraphUpdateIterator::new(self)?;
130        Ok(it)
131    }
132
133    /// Returns `true` if the update list is empty.
134    pub fn is_empty(&self) -> Result<bool> {
135        Ok(self.event_counter == 0)
136    }
137
138    // Returns the number of updates.
139    pub fn len(&self) -> Result<usize> {
140        let result = self.event_counter.try_into()?;
141        Ok(result)
142    }
143}
144
145fn finish_all_changesets(changesets: &mut Vec<ChangeSet>) -> Result<()> {
146    // Remove all changesets from the vector and finish them
147    let finished: Result<Vec<ChangeSet>> = changesets
148        .drain(..)
149        .map(|c| match c {
150            ChangeSet::InProgress {
151                table_builder,
152                outfile,
153            } => {
154                table_builder.finish()?;
155                // Re-open as table
156                let file = outfile.reopen()?;
157                let size = file.metadata()?.len();
158                let table = Table::new(sstable::Options::default(), Box::new(file), size as usize)?;
159                Ok(ChangeSet::Finished { table })
160            }
161            ChangeSet::Finished { table } => Ok(ChangeSet::Finished { table }),
162        })
163        .collect();
164    // Re-add the finished changesets
165    changesets.extend(finished?);
166
167    Ok(())
168}
169
170fn current_inprogress_changeset(changesets: &mut Vec<ChangeSet>) -> Result<&mut ChangeSet> {
171    let needs_new_changeset = if let Some(c) = changesets.last_mut() {
172        match c {
173            ChangeSet::InProgress { .. } => false,
174            ChangeSet::Finished { .. } => true,
175        }
176    } else {
177        true
178    };
179
180    if needs_new_changeset {
181        // Create a new changeset
182        let outfile = NamedTempFile::new()?;
183        let table_builder = TableBuilder::new(sstable::Options::default(), outfile.reopen()?);
184        let c = ChangeSet::InProgress {
185            table_builder: Box::new(table_builder),
186            outfile,
187        };
188        changesets.push(c);
189    }
190
191    // Get the last changeset, which must be in the InProgress state
192    changesets
193        .last_mut()
194        .ok_or(GraphAnnisCoreError::GraphUpdatePersistanceFileMissing)
195}
196
197pub struct GraphUpdateIterator {
198    iterators: Vec<TableIterator>,
199    size_hint: u64,
200    serialization: bincode::config::DefaultOptions,
201}
202
203impl GraphUpdateIterator {
204    fn new(g: &GraphUpdate) -> Result<GraphUpdateIterator> {
205        let mut changesets = g.changesets.lock()?;
206
207        finish_all_changesets(&mut changesets)?;
208
209        let iterators: Vec<_> = changesets
210            .iter()
211            .filter_map(|c| match c {
212                ChangeSet::InProgress { .. } => None,
213                ChangeSet::Finished { table } => {
214                    let mut it = table.iter();
215                    it.seek_to_first();
216                    Some(it)
217                }
218            })
219            .collect();
220        Ok(GraphUpdateIterator {
221            size_hint: g.event_counter,
222            iterators,
223            serialization: g.serialization,
224        })
225    }
226}
227
228impl std::iter::Iterator for GraphUpdateIterator {
229    type Item = Result<(u64, UpdateEvent)>;
230
231    fn next(&mut self) -> Option<Self::Item> {
232        // Remove all empty table iterators.
233        self.iterators.retain(|it| it.valid());
234
235        if let Some(it) = self.iterators.first_mut() {
236            // Get the current values
237            if let Some((key, value)) = sstable::current_key_val(it) {
238                // Create the actual types
239                let id = match u64::parse_key(&key) {
240                    Ok(id) => id,
241                    Err(e) => return Some(Err(e.into())),
242                };
243                let event: UpdateEvent = match self.serialization.deserialize(&value) {
244                    Ok(event) => event,
245                    Err(e) => return Some(Err(e.into())),
246                };
247
248                // Advance for next iteration
249                it.advance();
250                return Some(Ok((id, event)));
251            }
252        }
253        None
254    }
255
256    fn size_hint(&self) -> (usize, Option<usize>) {
257        if let Ok(s) = self.size_hint.try_into() {
258            (s, Some(s))
259        } else {
260            (0, None)
261        }
262    }
263}
264
265impl Serialize for GraphUpdate {
266    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
267    where
268        S: Serializer,
269    {
270        let iter = self.iter().map_err(S::Error::custom)?;
271        let number_of_updates = self.len().map_err(S::Error::custom)?;
272        let mut map_serializer = serializer.serialize_map(Some(number_of_updates))?;
273
274        for entry in iter {
275            let (key, value) = entry.map_err(S::Error::custom)?;
276            map_serializer
277                .serialize_entry(&key, &value)
278                .map_err(S::Error::custom)?;
279        }
280
281        map_serializer.end()
282    }
283}
284
285struct GraphUpdateVisitor {}
286
287impl<'de> Visitor<'de> for GraphUpdateVisitor {
288    type Value = GraphUpdate;
289
290    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
291        formatter.write_str("a list of graph updates")
292    }
293
294    fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
295    where
296        M: MapAccess<'de>,
297    {
298        let serialization = bincode::options();
299        let outfile = NamedTempFile::new().map_err(M::Error::custom)?;
300        let mut table_builder = TableBuilder::new(
301            sstable::Options::default(),
302            outfile.reopen().map_err(M::Error::custom)?,
303        );
304
305        let mut event_counter = 0;
306
307        while let Some((id, event)) = access.next_entry::<u64, UpdateEvent>()? {
308            event_counter = id;
309            let key = id.create_key();
310            let value = serialization.serialize(&event).map_err(M::Error::custom)?;
311            table_builder.add(&key, &value).map_err(M::Error::custom)?
312        }
313
314        let c = ChangeSet::InProgress {
315            outfile,
316            table_builder: Box::new(table_builder),
317        };
318        let mut changesets = vec![c];
319        finish_all_changesets(&mut changesets).map_err(M::Error::custom)?;
320        let g = GraphUpdate {
321            changesets: Mutex::new(changesets),
322            event_counter,
323            serialization,
324        };
325
326        Ok(g)
327    }
328}
329
330impl<'de> Deserialize<'de> for GraphUpdate {
331    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
332    where
333        D: Deserializer<'de>,
334    {
335        deserializer.deserialize_map(GraphUpdateVisitor {})
336    }
337}
338
339#[cfg(test)]
340mod tests;