grafeo_core/graph/lpg/store/
schema.rs1use super::{LpgStore, PropertyUndoEntry};
4use grafeo_common::types::{NodeId, TransactionId};
5use grafeo_common::utils::hash::FxHashMap;
6
7impl LpgStore {
8 #[cfg(not(feature = "tiered-storage"))]
13 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
14 let epoch = self.current_epoch();
15
16 let nodes = self.nodes.read();
18 if let Some(chain) = nodes.get(&node_id) {
19 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
20 return false;
21 }
22 } else {
23 return false;
24 }
25 drop(nodes);
26
27 let label_id = self.get_or_create_label_id(label);
29
30 let mut node_labels = self.node_labels.write();
32 let label_set = node_labels.entry(node_id).or_default();
33
34 if label_set.contains(&label_id) {
35 return false; }
37
38 label_set.insert(label_id);
39 drop(node_labels);
40
41 let mut index = self.label_index.write();
43 if (label_id as usize) >= index.len() {
44 index.resize(label_id as usize + 1, FxHashMap::default());
45 }
46 index[label_id as usize].insert(node_id, ());
47
48 if let Some(chain) = self.nodes.write().get_mut(&node_id)
50 && let Some(record) = chain.latest_mut()
51 {
52 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
53 record.set_label_count(count as u16);
54 }
55
56 true
57 }
58
59 #[cfg(feature = "tiered-storage")]
62 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
63 let epoch = self.current_epoch();
64
65 let versions = self.node_versions.read();
67 if let Some(index) = versions.get(&node_id) {
68 if let Some(vref) = index.visible_at(epoch) {
69 if let Some(record) = self.read_node_record(&vref) {
70 if record.is_deleted() {
71 return false;
72 }
73 } else {
74 return false;
75 }
76 } else {
77 return false;
78 }
79 } else {
80 return false;
81 }
82 drop(versions);
83
84 let label_id = self.get_or_create_label_id(label);
86
87 let mut node_labels = self.node_labels.write();
89 let label_set = node_labels.entry(node_id).or_default();
90
91 if label_set.contains(&label_id) {
92 return false; }
94
95 label_set.insert(label_id);
96 drop(node_labels);
97
98 let mut index = self.label_index.write();
100 if (label_id as usize) >= index.len() {
101 index.resize(label_id as usize + 1, FxHashMap::default());
102 }
103 index[label_id as usize].insert(node_id, ());
104
105 true
109 }
110
111 #[cfg(not(feature = "tiered-storage"))]
116 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
117 let epoch = self.current_epoch();
118
119 let nodes = self.nodes.read();
121 if let Some(chain) = nodes.get(&node_id) {
122 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
123 return false;
124 }
125 } else {
126 return false;
127 }
128 drop(nodes);
129
130 let label_id = {
132 let label_ids = self.label_to_id.read();
133 match label_ids.get(label) {
134 Some(&id) => id,
135 None => return false, }
137 };
138
139 let mut node_labels = self.node_labels.write();
141 if let Some(label_set) = node_labels.get_mut(&node_id) {
142 if !label_set.remove(&label_id) {
143 return false; }
145 } else {
146 return false;
147 }
148 drop(node_labels);
149
150 let mut index = self.label_index.write();
152 if (label_id as usize) < index.len() {
153 index[label_id as usize].remove(&node_id);
154 }
155
156 if let Some(chain) = self.nodes.write().get_mut(&node_id)
158 && let Some(record) = chain.latest_mut()
159 {
160 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
161 record.set_label_count(count as u16);
162 }
163
164 true
165 }
166
167 #[cfg(feature = "tiered-storage")]
170 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
171 let epoch = self.current_epoch();
172
173 let versions = self.node_versions.read();
175 if let Some(index) = versions.get(&node_id) {
176 if let Some(vref) = index.visible_at(epoch) {
177 if let Some(record) = self.read_node_record(&vref) {
178 if record.is_deleted() {
179 return false;
180 }
181 } else {
182 return false;
183 }
184 } else {
185 return false;
186 }
187 } else {
188 return false;
189 }
190 drop(versions);
191
192 let label_id = {
194 let label_ids = self.label_to_id.read();
195 match label_ids.get(label) {
196 Some(&id) => id,
197 None => return false, }
199 };
200
201 let mut node_labels = self.node_labels.write();
203 if let Some(label_set) = node_labels.get_mut(&node_id) {
204 if !label_set.remove(&label_id) {
205 return false; }
207 } else {
208 return false;
209 }
210 drop(node_labels);
211
212 let mut index = self.label_index.write();
214 if (label_id as usize) < index.len() {
215 index[label_id as usize].remove(&node_id);
216 }
217
218 true
221 }
222
223 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
229 let label_to_id = self.label_to_id.read();
230 if let Some(&label_id) = label_to_id.get(label) {
231 let index = self.label_index.read();
232 if let Some(set) = index.get(label_id as usize) {
233 let mut ids: Vec<NodeId> = set.keys().copied().collect();
234 ids.sort_unstable();
235 return ids;
236 }
237 }
238 Vec::new()
239 }
240
241 #[must_use]
243 pub fn label_count(&self) -> usize {
244 self.id_to_label.read().len()
245 }
246
247 #[must_use]
251 pub fn property_key_count(&self) -> usize {
252 let node_keys = self.node_properties.column_count();
253 let edge_keys = self.edge_properties.column_count();
254 node_keys + edge_keys
258 }
259
260 #[must_use]
262 pub fn edge_type_count(&self) -> usize {
263 self.id_to_edge_type.read().len()
264 }
265
266 pub fn all_labels(&self) -> Vec<String> {
268 self.id_to_label
269 .read()
270 .iter()
271 .map(|s| s.to_string())
272 .collect()
273 }
274
275 pub fn all_edge_types(&self) -> Vec<String> {
277 self.id_to_edge_type
278 .read()
279 .iter()
280 .map(|s| s.to_string())
281 .collect()
282 }
283
284 pub fn all_property_keys(&self) -> Vec<String> {
286 let mut keys = std::collections::HashSet::new();
287 for key in self.node_properties.keys() {
288 keys.insert(key.to_string());
289 }
290 for key in self.edge_properties.keys() {
291 keys.insert(key.to_string());
292 }
293 keys.into_iter().collect()
294 }
295
296 #[must_use]
298 pub fn peek_next_node_id(&self) -> u64 {
299 self.next_node_id.load(std::sync::atomic::Ordering::Relaxed)
300 }
301
302 #[must_use]
304 pub fn peek_next_edge_id(&self) -> u64 {
305 self.next_edge_id.load(std::sync::atomic::Ordering::Relaxed)
306 }
307
308 pub fn add_label_versioned(
311 &self,
312 node_id: NodeId,
313 label: &str,
314 transaction_id: TransactionId,
315 ) -> bool {
316 let added = self.add_label(node_id, label);
317 if added {
318 self.property_undo_log
319 .write()
320 .entry(transaction_id)
321 .or_default()
322 .push(PropertyUndoEntry::LabelAdded {
323 node_id,
324 label: label.to_string(),
325 });
326 }
327 added
328 }
329
330 pub fn remove_label_versioned(
333 &self,
334 node_id: NodeId,
335 label: &str,
336 transaction_id: TransactionId,
337 ) -> bool {
338 let removed = self.remove_label(node_id, label);
339 if removed {
340 self.property_undo_log
341 .write()
342 .entry(transaction_id)
343 .or_default()
344 .push(PropertyUndoEntry::LabelRemoved {
345 node_id,
346 label: label.to_string(),
347 });
348 }
349 removed
350 }
351}