use super::{LpgStore, PropertyUndoEntry};
#[cfg(feature = "temporal")]
use grafeo_common::types::EpochId;
use grafeo_common::types::{NodeId, TransactionId};
use grafeo_common::utils::hash::FxHashMap;
impl LpgStore {
#[cfg(not(feature = "tiered-storage"))]
pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
let epoch = self.current_epoch();
let nodes = self.nodes.read();
if let Some(chain) = nodes.get(&node_id) {
if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
return false;
}
} else {
return false;
}
drop(nodes);
let label_id = self.get_or_create_label_id(label);
let mut node_labels = self.node_labels.write();
#[cfg(not(feature = "temporal"))]
{
let label_set = node_labels.entry(node_id).or_default();
if label_set.contains(&label_id) {
return false;
}
label_set.insert(label_id);
}
#[cfg(feature = "temporal")]
{
let current = node_labels
.get(&node_id)
.and_then(|log| log.latest())
.cloned()
.unwrap_or_default();
if current.contains(&label_id) {
return false;
}
let mut new_set = current;
new_set.insert(label_id);
node_labels
.entry(node_id)
.or_default()
.append(self.current_epoch(), new_set);
}
drop(node_labels);
let mut index = self.label_index.write();
if (label_id as usize) >= index.len() {
index.resize(label_id as usize + 1, FxHashMap::default());
}
index[label_id as usize].insert(node_id, ());
#[cfg(not(feature = "temporal"))]
if let Some(chain) = self.nodes.write().get_mut(&node_id)
&& let Some(record) = chain.latest_mut()
{
let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
record.set_label_count(count as u16);
}
true
}
#[cfg(feature = "tiered-storage")]
pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
let epoch = self.current_epoch();
let versions = self.node_versions.read();
if let Some(index) = versions.get(&node_id) {
if let Some(vref) = index.visible_at(epoch) {
if let Some(record) = self.read_node_record(&vref) {
if record.is_deleted() {
return false;
}
} else {
return false;
}
} else {
return false;
}
} else {
return false;
}
drop(versions);
let label_id = self.get_or_create_label_id(label);
let mut node_labels = self.node_labels.write();
#[cfg(not(feature = "temporal"))]
{
let label_set = node_labels.entry(node_id).or_default();
if label_set.contains(&label_id) {
return false;
}
label_set.insert(label_id);
}
#[cfg(feature = "temporal")]
{
let current = node_labels
.get(&node_id)
.and_then(|log| log.latest())
.cloned()
.unwrap_or_default();
if current.contains(&label_id) {
return false;
}
let mut new_set = current;
new_set.insert(label_id);
node_labels
.entry(node_id)
.or_default()
.append(self.current_epoch(), new_set);
}
drop(node_labels);
let mut index = self.label_index.write();
if (label_id as usize) >= index.len() {
index.resize(label_id as usize + 1, FxHashMap::default());
}
index[label_id as usize].insert(node_id, ());
true
}
#[cfg(not(feature = "tiered-storage"))]
pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
let epoch = self.current_epoch();
let nodes = self.nodes.read();
if let Some(chain) = nodes.get(&node_id) {
if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
return false;
}
} else {
return false;
}
drop(nodes);
let label_id = {
let reg = self.label_registry.read();
match reg.get_id(label) {
Some(id) => id,
None => return false, }
};
let mut node_labels = self.node_labels.write();
#[cfg(not(feature = "temporal"))]
{
if let Some(label_set) = node_labels.get_mut(&node_id) {
if !label_set.remove(&label_id) {
return false;
}
} else {
return false;
}
}
#[cfg(feature = "temporal")]
{
let current = node_labels
.get(&node_id)
.and_then(|log| log.latest())
.cloned()
.unwrap_or_default();
if !current.contains(&label_id) {
return false;
}
let mut new_set = current;
new_set.remove(&label_id);
node_labels
.entry(node_id)
.or_default()
.append(self.current_epoch(), new_set);
}
drop(node_labels);
let mut index = self.label_index.write();
if (label_id as usize) < index.len() {
index[label_id as usize].remove(&node_id);
}
#[cfg(not(feature = "temporal"))]
if let Some(chain) = self.nodes.write().get_mut(&node_id)
&& let Some(record) = chain.latest_mut()
{
let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
record.set_label_count(count as u16);
}
true
}
#[cfg(feature = "tiered-storage")]
pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
let epoch = self.current_epoch();
let versions = self.node_versions.read();
if let Some(index) = versions.get(&node_id) {
if let Some(vref) = index.visible_at(epoch) {
if let Some(record) = self.read_node_record(&vref) {
if record.is_deleted() {
return false;
}
} else {
return false;
}
} else {
return false;
}
} else {
return false;
}
drop(versions);
let label_id = {
let reg = self.label_registry.read();
match reg.get_id(label) {
Some(id) => id,
None => return false,
}
};
let mut node_labels = self.node_labels.write();
#[cfg(not(feature = "temporal"))]
{
if let Some(label_set) = node_labels.get_mut(&node_id) {
if !label_set.remove(&label_id) {
return false;
}
} else {
return false;
}
}
#[cfg(feature = "temporal")]
{
let current = node_labels
.get(&node_id)
.and_then(|log| log.latest())
.cloned()
.unwrap_or_default();
if !current.contains(&label_id) {
return false;
}
let mut new_set = current;
new_set.remove(&label_id);
node_labels
.entry(node_id)
.or_default()
.append(self.current_epoch(), new_set);
}
drop(node_labels);
let mut index = self.label_index.write();
if (label_id as usize) < index.len() {
index[label_id as usize].remove(&node_id);
}
true
}
pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
let reg = self.label_registry.read();
if let Some(label_id) = reg.get_id(label) {
let index = self.label_index.read();
if let Some(set) = index.get(label_id as usize) {
let mut ids: Vec<NodeId> = set.keys().copied().collect();
ids.sort_unstable();
return ids;
}
}
Vec::new()
}
#[must_use]
pub fn label_count(&self) -> usize {
self.label_registry.read().len()
}
#[must_use]
pub fn property_key_count(&self) -> usize {
let node_keys = self.node_properties.column_count();
let edge_keys = self.edge_properties.column_count();
node_keys + edge_keys
}
#[must_use]
pub fn edge_type_count(&self) -> usize {
self.id_to_edge_type.read().len()
}
pub fn all_labels(&self) -> Vec<String> {
self.label_registry
.read()
.names()
.iter()
.map(|s| s.to_string())
.collect()
}
pub fn all_edge_types(&self) -> Vec<String> {
self.id_to_edge_type
.read()
.iter()
.map(|s| s.to_string())
.collect()
}
pub fn all_property_keys(&self) -> Vec<String> {
let mut keys = std::collections::HashSet::new();
for key in self.node_properties.keys() {
keys.insert(key.to_string());
}
for key in self.edge_properties.keys() {
keys.insert(key.to_string());
}
keys.into_iter().collect()
}
#[must_use]
pub fn peek_next_node_id(&self) -> u64 {
self.next_node_id.load(std::sync::atomic::Ordering::Relaxed)
}
#[must_use]
pub fn peek_next_edge_id(&self) -> u64 {
self.next_edge_id.load(std::sync::atomic::Ordering::Relaxed)
}
#[cfg(not(feature = "temporal"))]
pub fn add_label_versioned(
&self,
node_id: NodeId,
label: &str,
transaction_id: TransactionId,
) -> bool {
let added = self.add_label(node_id, label);
if added {
self.property_undo_log
.write()
.entry(transaction_id)
.or_default()
.push(PropertyUndoEntry::LabelAdded {
node_id,
label: label.to_string(),
});
}
added
}
#[cfg(feature = "temporal")]
pub fn add_label_versioned(
&self,
node_id: NodeId,
label: &str,
transaction_id: TransactionId,
) -> bool {
let label_id = self.get_or_create_label_id(label);
let mut node_labels = self.node_labels.write();
let current = node_labels
.get(&node_id)
.and_then(|log| log.latest())
.cloned()
.unwrap_or_default();
if current.contains(&label_id) {
return false;
}
let mut new_set = current;
new_set.insert(label_id);
node_labels
.entry(node_id)
.or_default()
.append(EpochId::PENDING, new_set);
drop(node_labels);
let mut index = self.label_index.write();
if (label_id as usize) >= index.len() {
index.resize(label_id as usize + 1, FxHashMap::default());
}
index[label_id as usize].insert(node_id, ());
self.property_undo_log
.write()
.entry(transaction_id)
.or_default()
.push(PropertyUndoEntry::LabelAdded {
node_id,
label: label.to_string(),
});
true
}
#[cfg(not(feature = "temporal"))]
pub fn remove_label_versioned(
&self,
node_id: NodeId,
label: &str,
transaction_id: TransactionId,
) -> bool {
let removed = self.remove_label(node_id, label);
if removed {
self.property_undo_log
.write()
.entry(transaction_id)
.or_default()
.push(PropertyUndoEntry::LabelRemoved {
node_id,
label: label.to_string(),
});
}
removed
}
#[cfg(feature = "temporal")]
pub fn remove_label_versioned(
&self,
node_id: NodeId,
label: &str,
transaction_id: TransactionId,
) -> bool {
let label_id = {
let reg = self.label_registry.read();
match reg.get_id(label) {
Some(id) => id,
None => return false,
}
};
let mut node_labels = self.node_labels.write();
let current = node_labels
.get(&node_id)
.and_then(|log| log.latest())
.cloned()
.unwrap_or_default();
if !current.contains(&label_id) {
return false;
}
let mut new_set = current;
new_set.remove(&label_id);
node_labels
.entry(node_id)
.or_default()
.append(EpochId::PENDING, new_set);
drop(node_labels);
let mut index = self.label_index.write();
if (label_id as usize) < index.len() {
index[label_id as usize].remove(&node_id);
}
self.property_undo_log
.write()
.entry(transaction_id)
.or_default()
.push(PropertyUndoEntry::LabelRemoved {
node_id,
label: label.to_string(),
});
true
}
}