1use std::convert::TryInto;
4use std::fs::File;
5use std::sync::Mutex;
6
7use crate::errors::{GraphAnnisCoreError, Result};
8use crate::serializer::KeySerializer;
9use bincode::Options;
10use serde::de::Error as DeserializeError;
11use serde::de::{MapAccess, Visitor};
12use serde::ser::{Error as SerializeError, SerializeMap};
13use serde::{Deserialize, Deserializer, Serialize, Serializer};
14use sstable::{SSIterator, Table, TableBuilder, TableIterator};
15use tempfile::NamedTempFile;
16
17#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
19pub enum UpdateEvent {
20 AddNode {
22 node_name: String,
23 node_type: String,
24 },
25 DeleteNode { node_name: String },
27 AddNodeLabel {
29 node_name: String,
30 anno_ns: String,
31 anno_name: String,
32 anno_value: String,
33 },
34 DeleteNodeLabel {
36 node_name: String,
37 anno_ns: String,
38 anno_name: String,
39 },
40 AddEdge {
42 source_node: String,
43 target_node: String,
44 layer: String,
45 component_type: String,
46 component_name: String,
47 },
48 DeleteEdge {
50 source_node: String,
51 target_node: String,
52 layer: String,
53 component_type: String,
54 component_name: String,
55 },
56 AddEdgeLabel {
58 source_node: String,
59 target_node: String,
60 layer: String,
61 component_type: String,
62 component_name: String,
63 anno_ns: String,
64 anno_name: String,
65 anno_value: String,
66 },
67 DeleteEdgeLabel {
69 source_node: String,
70 target_node: String,
71 layer: String,
72 component_type: String,
73 component_name: String,
74 anno_ns: String,
75 anno_name: String,
76 },
77}
78
79enum ChangeSet {
80 InProgress {
81 table_builder: Box<TableBuilder<File>>,
82 outfile: NamedTempFile,
83 },
84 Finished {
85 table: Table,
86 },
87}
88
89pub struct GraphUpdate {
91 changesets: Mutex<Vec<ChangeSet>>,
92 event_counter: u64,
93 serialization: bincode::config::DefaultOptions,
94}
95
96impl Default for GraphUpdate {
97 fn default() -> Self {
98 GraphUpdate::new()
99 }
100}
101
102impl GraphUpdate {
103 pub fn new() -> GraphUpdate {
105 GraphUpdate {
106 event_counter: 0,
107 changesets: Mutex::new(Vec::new()),
108 serialization: bincode::options(),
109 }
110 }
111
112 pub fn add_event(&mut self, event: UpdateEvent) -> Result<()> {
114 let new_event_counter = self.event_counter + 1;
115 let key = new_event_counter.create_key();
116 let value = self.serialization.serialize(&event)?;
117 let mut changeset = self.changesets.lock()?;
118 if let ChangeSet::InProgress { table_builder, .. } =
119 current_inprogress_changeset(&mut changeset)?
120 {
121 table_builder.add(&key, &value)?;
122 self.event_counter = new_event_counter;
123 }
124 Ok(())
125 }
126
127 pub fn iter(&self) -> Result<GraphUpdateIterator> {
129 let it = GraphUpdateIterator::new(self)?;
130 Ok(it)
131 }
132
133 pub fn is_empty(&self) -> Result<bool> {
135 Ok(self.event_counter == 0)
136 }
137
138 pub fn len(&self) -> Result<usize> {
140 let result = self.event_counter.try_into()?;
141 Ok(result)
142 }
143}
144
145fn finish_all_changesets(changesets: &mut Vec<ChangeSet>) -> Result<()> {
146 let finished: Result<Vec<ChangeSet>> = changesets
148 .drain(..)
149 .map(|c| match c {
150 ChangeSet::InProgress {
151 table_builder,
152 outfile,
153 } => {
154 table_builder.finish()?;
155 let file = outfile.reopen()?;
157 let size = file.metadata()?.len();
158 let table = Table::new(sstable::Options::default(), Box::new(file), size as usize)?;
159 Ok(ChangeSet::Finished { table })
160 }
161 ChangeSet::Finished { table } => Ok(ChangeSet::Finished { table }),
162 })
163 .collect();
164 changesets.extend(finished?);
166
167 Ok(())
168}
169
170fn current_inprogress_changeset(changesets: &mut Vec<ChangeSet>) -> Result<&mut ChangeSet> {
171 let needs_new_changeset = if let Some(c) = changesets.last_mut() {
172 match c {
173 ChangeSet::InProgress { .. } => false,
174 ChangeSet::Finished { .. } => true,
175 }
176 } else {
177 true
178 };
179
180 if needs_new_changeset {
181 let outfile = NamedTempFile::new()?;
183 let table_builder = TableBuilder::new(sstable::Options::default(), outfile.reopen()?);
184 let c = ChangeSet::InProgress {
185 table_builder: Box::new(table_builder),
186 outfile,
187 };
188 changesets.push(c);
189 }
190
191 changesets
193 .last_mut()
194 .ok_or(GraphAnnisCoreError::GraphUpdatePersistanceFileMissing)
195}
196
197pub struct GraphUpdateIterator {
198 iterators: Vec<TableIterator>,
199 size_hint: u64,
200 serialization: bincode::config::DefaultOptions,
201}
202
203impl GraphUpdateIterator {
204 fn new(g: &GraphUpdate) -> Result<GraphUpdateIterator> {
205 let mut changesets = g.changesets.lock()?;
206
207 finish_all_changesets(&mut changesets)?;
208
209 let iterators: Vec<_> = changesets
210 .iter()
211 .filter_map(|c| match c {
212 ChangeSet::InProgress { .. } => None,
213 ChangeSet::Finished { table } => {
214 let mut it = table.iter();
215 it.seek_to_first();
216 Some(it)
217 }
218 })
219 .collect();
220 Ok(GraphUpdateIterator {
221 size_hint: g.event_counter,
222 iterators,
223 serialization: g.serialization,
224 })
225 }
226}
227
228impl std::iter::Iterator for GraphUpdateIterator {
229 type Item = Result<(u64, UpdateEvent)>;
230
231 fn next(&mut self) -> Option<Self::Item> {
232 self.iterators.retain(|it| it.valid());
234
235 if let Some(it) = self.iterators.first_mut() {
236 if let Some((key, value)) = sstable::current_key_val(it) {
238 let id = match u64::parse_key(&key) {
240 Ok(id) => id,
241 Err(e) => return Some(Err(e.into())),
242 };
243 let event: UpdateEvent = match self.serialization.deserialize(&value) {
244 Ok(event) => event,
245 Err(e) => return Some(Err(e.into())),
246 };
247
248 it.advance();
250 return Some(Ok((id, event)));
251 }
252 }
253 None
254 }
255
256 fn size_hint(&self) -> (usize, Option<usize>) {
257 if let Ok(s) = self.size_hint.try_into() {
258 (s, Some(s))
259 } else {
260 (0, None)
261 }
262 }
263}
264
265impl Serialize for GraphUpdate {
266 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
267 where
268 S: Serializer,
269 {
270 let iter = self.iter().map_err(S::Error::custom)?;
271 let number_of_updates = self.len().map_err(S::Error::custom)?;
272 let mut map_serializer = serializer.serialize_map(Some(number_of_updates))?;
273
274 for entry in iter {
275 let (key, value) = entry.map_err(S::Error::custom)?;
276 map_serializer
277 .serialize_entry(&key, &value)
278 .map_err(S::Error::custom)?;
279 }
280
281 map_serializer.end()
282 }
283}
284
285struct GraphUpdateVisitor {}
286
287impl<'de> Visitor<'de> for GraphUpdateVisitor {
288 type Value = GraphUpdate;
289
290 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
291 formatter.write_str("a list of graph updates")
292 }
293
294 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
295 where
296 M: MapAccess<'de>,
297 {
298 let serialization = bincode::options();
299 let outfile = NamedTempFile::new().map_err(M::Error::custom)?;
300 let mut table_builder = TableBuilder::new(
301 sstable::Options::default(),
302 outfile.reopen().map_err(M::Error::custom)?,
303 );
304
305 let mut event_counter = 0;
306
307 while let Some((id, event)) = access.next_entry::<u64, UpdateEvent>()? {
308 event_counter = id;
309 let key = id.create_key();
310 let value = serialization.serialize(&event).map_err(M::Error::custom)?;
311 table_builder.add(&key, &value).map_err(M::Error::custom)?
312 }
313
314 let c = ChangeSet::InProgress {
315 outfile,
316 table_builder: Box::new(table_builder),
317 };
318 let mut changesets = vec![c];
319 finish_all_changesets(&mut changesets).map_err(M::Error::custom)?;
320 let g = GraphUpdate {
321 changesets: Mutex::new(changesets),
322 event_counter,
323 serialization,
324 };
325
326 Ok(g)
327 }
328}
329
330impl<'de> Deserialize<'de> for GraphUpdate {
331 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
332 where
333 D: Deserializer<'de>,
334 {
335 deserializer.deserialize_map(GraphUpdateVisitor {})
336 }
337}
338
339#[cfg(test)]
340mod tests;