1use std::convert::TryInto;
4use std::fs::File;
5use std::sync::Mutex;
6
7use crate::errors::{GraphAnnisCoreError, Result};
8use crate::serializer::KeySerializer;
9use bincode::Options;
10use facet::Facet;
11use serde::de::Error as DeserializeError;
12use serde::de::{MapAccess, Visitor};
13use serde::ser::{Error as SerializeError, SerializeMap};
14use serde::{Deserialize, Deserializer, Serialize, Serializer};
15use sstable::{SSIterator, Table, TableBuilder, TableIterator};
16use tempfile::NamedTempFile;
17
18#[derive(Serialize, Deserialize, Clone, Debug, Facet, PartialEq)]
20#[repr(u8)]
21pub enum UpdateEvent {
22 AddNode {
24 node_name: String,
25 node_type: String,
26 },
27 DeleteNode { node_name: String },
29 AddNodeLabel {
31 node_name: String,
32 anno_ns: String,
33 anno_name: String,
34 anno_value: String,
35 },
36 DeleteNodeLabel {
38 node_name: String,
39 anno_ns: String,
40 anno_name: String,
41 },
42 AddEdge {
44 source_node: String,
45 target_node: String,
46 layer: String,
47 component_type: String,
48 component_name: String,
49 },
50 DeleteEdge {
52 source_node: String,
53 target_node: String,
54 layer: String,
55 component_type: String,
56 component_name: String,
57 },
58 AddEdgeLabel {
60 source_node: String,
61 target_node: String,
62 layer: String,
63 component_type: String,
64 component_name: String,
65 anno_ns: String,
66 anno_name: String,
67 anno_value: String,
68 },
69 DeleteEdgeLabel {
71 source_node: String,
72 target_node: String,
73 layer: String,
74 component_type: String,
75 component_name: String,
76 anno_ns: String,
77 anno_name: String,
78 },
79}
80
81enum ChangeSet {
82 InProgress {
83 table_builder: Box<TableBuilder<File>>,
84 outfile: NamedTempFile,
85 },
86 Finished {
87 table: Table,
88 },
89}
90
91pub struct GraphUpdate {
93 changesets: Mutex<Vec<ChangeSet>>,
94 event_counter: u64,
95 serialization: bincode::config::DefaultOptions,
96}
97
98impl Default for GraphUpdate {
99 fn default() -> Self {
100 GraphUpdate::new()
101 }
102}
103
104impl GraphUpdate {
105 pub fn new() -> GraphUpdate {
107 GraphUpdate {
108 event_counter: 0,
109 changesets: Mutex::new(Vec::new()),
110 serialization: bincode::options(),
111 }
112 }
113
114 pub fn add_event(&mut self, event: UpdateEvent) -> Result<()> {
116 let new_event_counter = self.event_counter + 1;
117 let key = new_event_counter.create_key();
118 let value = self.serialization.serialize(&event)?;
119 let mut changeset = self.changesets.lock()?;
120 if let ChangeSet::InProgress { table_builder, .. } =
121 current_inprogress_changeset(&mut changeset)?
122 {
123 table_builder.add(&key, &value)?;
124 self.event_counter = new_event_counter;
125 }
126 Ok(())
127 }
128
129 pub fn iter(&self) -> Result<GraphUpdateIterator> {
131 let it = GraphUpdateIterator::new(self)?;
132 Ok(it)
133 }
134
135 pub fn is_empty(&self) -> Result<bool> {
137 Ok(self.event_counter == 0)
138 }
139
140 pub fn len(&self) -> Result<usize> {
142 let result = self.event_counter.try_into()?;
143 Ok(result)
144 }
145}
146
147fn finish_all_changesets(changesets: &mut Vec<ChangeSet>) -> Result<()> {
148 let finished: Result<Vec<ChangeSet>> = changesets
150 .drain(..)
151 .map(|c| match c {
152 ChangeSet::InProgress {
153 table_builder,
154 outfile,
155 } => {
156 table_builder.finish()?;
157 let file = outfile.reopen()?;
159 let size = file.metadata()?.len();
160 let table = Table::new(sstable::Options::default(), Box::new(file), size as usize)?;
161 Ok(ChangeSet::Finished { table })
162 }
163 ChangeSet::Finished { table } => Ok(ChangeSet::Finished { table }),
164 })
165 .collect();
166 changesets.extend(finished?);
168
169 Ok(())
170}
171
172fn current_inprogress_changeset(changesets: &mut Vec<ChangeSet>) -> Result<&mut ChangeSet> {
173 let needs_new_changeset = if let Some(c) = changesets.last_mut() {
174 match c {
175 ChangeSet::InProgress { .. } => false,
176 ChangeSet::Finished { .. } => true,
177 }
178 } else {
179 true
180 };
181
182 if needs_new_changeset {
183 let outfile = NamedTempFile::new()?;
185 let table_builder = TableBuilder::new(sstable::Options::default(), outfile.reopen()?);
186 let c = ChangeSet::InProgress {
187 table_builder: Box::new(table_builder),
188 outfile,
189 };
190 changesets.push(c);
191 }
192
193 changesets
195 .last_mut()
196 .ok_or(GraphAnnisCoreError::GraphUpdatePersistanceFileMissing)
197}
198
199pub struct GraphUpdateIterator {
200 iterators: Vec<TableIterator>,
201 size_hint: u64,
202 serialization: bincode::config::DefaultOptions,
203}
204
205impl GraphUpdateIterator {
206 fn new(g: &GraphUpdate) -> Result<GraphUpdateIterator> {
207 let mut changesets = g.changesets.lock()?;
208
209 finish_all_changesets(&mut changesets)?;
210
211 let iterators: Vec<_> = changesets
212 .iter()
213 .filter_map(|c| match c {
214 ChangeSet::InProgress { .. } => None,
215 ChangeSet::Finished { table } => {
216 let mut it = table.iter();
217 it.seek_to_first();
218 Some(it)
219 }
220 })
221 .collect();
222 Ok(GraphUpdateIterator {
223 size_hint: g.event_counter,
224 iterators,
225 serialization: g.serialization,
226 })
227 }
228}
229
230impl std::iter::Iterator for GraphUpdateIterator {
231 type Item = Result<(u64, UpdateEvent)>;
232
233 fn next(&mut self) -> Option<Self::Item> {
234 self.iterators.retain(|it| it.valid());
236
237 if let Some(it) = self.iterators.first_mut() {
238 if let Some((key, value)) = sstable::current_key_val(it) {
240 let id = match u64::parse_key(&key) {
242 Ok(id) => id,
243 Err(e) => return Some(Err(e.into())),
244 };
245 let event: UpdateEvent = match self.serialization.deserialize(&value) {
246 Ok(event) => event,
247 Err(e) => return Some(Err(e.into())),
248 };
249
250 it.advance();
252 return Some(Ok((id, event)));
253 }
254 }
255 None
256 }
257
258 fn size_hint(&self) -> (usize, Option<usize>) {
259 if let Ok(s) = self.size_hint.try_into() {
260 (s, Some(s))
261 } else {
262 (0, None)
263 }
264 }
265}
266
267impl Serialize for GraphUpdate {
268 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
269 where
270 S: Serializer,
271 {
272 let iter = self.iter().map_err(S::Error::custom)?;
273 let number_of_updates = self.len().map_err(S::Error::custom)?;
274 let mut map_serializer = serializer.serialize_map(Some(number_of_updates))?;
275
276 for entry in iter {
277 let (key, value) = entry.map_err(S::Error::custom)?;
278 map_serializer
279 .serialize_entry(&key, &value)
280 .map_err(S::Error::custom)?;
281 }
282
283 map_serializer.end()
284 }
285}
286
287struct GraphUpdateVisitor {}
288
289impl<'de> Visitor<'de> for GraphUpdateVisitor {
290 type Value = GraphUpdate;
291
292 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
293 formatter.write_str("a list of graph updates")
294 }
295
296 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
297 where
298 M: MapAccess<'de>,
299 {
300 let serialization = bincode::options();
301 let outfile = NamedTempFile::new().map_err(M::Error::custom)?;
302 let mut table_builder = TableBuilder::new(
303 sstable::Options::default(),
304 outfile.reopen().map_err(M::Error::custom)?,
305 );
306
307 let mut event_counter = 0;
308
309 while let Some((id, event)) = access.next_entry::<u64, UpdateEvent>()? {
310 event_counter = id;
311 let key = id.create_key();
312 let value = serialization.serialize(&event).map_err(M::Error::custom)?;
313 table_builder.add(&key, &value).map_err(M::Error::custom)?
314 }
315
316 let c = ChangeSet::InProgress {
317 outfile,
318 table_builder: Box::new(table_builder),
319 };
320 let mut changesets = vec![c];
321 finish_all_changesets(&mut changesets).map_err(M::Error::custom)?;
322 let g = GraphUpdate {
323 changesets: Mutex::new(changesets),
324 event_counter,
325 serialization,
326 };
327
328 Ok(g)
329 }
330}
331
332impl<'de> Deserialize<'de> for GraphUpdate {
333 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
334 where
335 D: Deserializer<'de>,
336 {
337 deserializer.deserialize_map(GraphUpdateVisitor {})
338 }
339}
340
341#[cfg(test)]
342mod tests;