use crate::runtime::context::QueryContext;
use crate::runtime::l0::L0Buffer;
use std::collections::HashMap;
use uni_common::Properties;
use uni_common::Value;
use uni_common::core::id::{Eid, Vid};
pub fn is_vertex_deleted(vid: Vid, ctx: Option<&QueryContext>) -> bool {
let ctx = match ctx {
Some(c) => c,
None => return false,
};
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if tx_l0.vertex_tombstones.contains(&vid) {
return true;
}
}
{
let l0 = ctx.l0.read();
if l0.vertex_tombstones.contains(&vid) {
return true;
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if pending_l0.vertex_tombstones.contains(&vid) {
return true;
}
}
false
}
pub fn is_edge_deleted(eid: Eid, ctx: Option<&QueryContext>) -> bool {
let ctx = match ctx {
Some(c) => c,
None => return false,
};
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if tx_l0.tombstones.contains_key(&eid) {
return true;
}
}
{
let l0 = ctx.l0.read();
if l0.tombstones.contains_key(&eid) {
return true;
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if pending_l0.tombstones.contains_key(&eid) {
return true;
}
}
false
}
pub fn lookup_vertex_prop(vid: Vid, prop: &str, ctx: Option<&QueryContext>) -> Option<Value> {
let ctx = ctx?;
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(props) = tx_l0.vertex_properties.get(&vid)
&& let Some(val) = props.get(prop)
{
return Some(val.clone());
}
}
{
let l0 = ctx.l0.read();
if let Some(props) = l0.vertex_properties.get(&vid)
&& let Some(val) = props.get(prop)
{
return Some(val.clone());
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if let Some(props) = pending_l0.vertex_properties.get(&vid)
&& let Some(val) = props.get(prop)
{
return Some(val.clone());
}
}
None
}
pub fn lookup_edge_prop(eid: Eid, prop: &str, ctx: Option<&QueryContext>) -> Option<Value> {
let ctx = ctx?;
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(props) = tx_l0.edge_properties.get(&eid)
&& let Some(val) = props.get(prop)
{
return Some(val.clone());
}
}
{
let l0 = ctx.l0.read();
if let Some(props) = l0.edge_properties.get(&eid)
&& let Some(val) = props.get(prop)
{
return Some(val.clone());
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if let Some(props) = pending_l0.edge_properties.get(&eid)
&& let Some(val) = props.get(prop)
{
return Some(val.clone());
}
}
None
}
pub fn accumulate_vertex_props(vid: Vid, ctx: Option<&QueryContext>) -> Option<Properties> {
let ctx = ctx?;
let mut result: Option<Properties> = None;
for pending_l0_arc in ctx.pending_flush_l0s.iter() {
let pending_l0 = pending_l0_arc.read();
if let Some(props) = pending_l0.vertex_properties.get(&vid) {
let entry = result.get_or_insert_with(HashMap::new);
for (k, v) in props {
entry.insert(k.clone(), v.clone());
}
}
}
{
let l0 = ctx.l0.read();
if let Some(props) = l0.vertex_properties.get(&vid) {
let entry = result.get_or_insert_with(HashMap::new);
for (k, v) in props {
entry.insert(k.clone(), v.clone());
}
}
}
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(props) = tx_l0.vertex_properties.get(&vid) {
let entry = result.get_or_insert_with(HashMap::new);
for (k, v) in props {
entry.insert(k.clone(), v.clone());
}
}
}
result
}
pub fn accumulate_edge_props(eid: Eid, ctx: Option<&QueryContext>) -> Option<Properties> {
let ctx = ctx?;
let mut result: Option<Properties> = None;
for pending_l0_arc in ctx.pending_flush_l0s.iter() {
let pending_l0 = pending_l0_arc.read();
if let Some(props) = pending_l0.edge_properties.get(&eid) {
let entry = result.get_or_insert_with(HashMap::new);
for (k, v) in props {
entry.insert(k.clone(), v.clone());
}
}
}
{
let l0 = ctx.l0.read();
if let Some(props) = l0.edge_properties.get(&eid) {
let entry = result.get_or_insert_with(HashMap::new);
for (k, v) in props {
entry.insert(k.clone(), v.clone());
}
}
}
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(props) = tx_l0.edge_properties.get(&eid) {
let entry = result.get_or_insert_with(HashMap::new);
for (k, v) in props {
entry.insert(k.clone(), v.clone());
}
}
}
result
}
pub fn visit_l0_buffers<F>(ctx: Option<&QueryContext>, mut visitor: F) -> bool
where
F: FnMut(&L0Buffer) -> bool,
{
let ctx = match ctx {
Some(c) => c,
None => return false,
};
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if visitor(&tx_l0) {
return true;
}
}
{
let l0 = ctx.l0.read();
if visitor(&l0) {
return true;
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if visitor(&pending_l0) {
return true;
}
}
false
}
pub fn overlay_vertex_batch(
vid_to_idx: &HashMap<Vid, usize>,
result: &mut [Properties],
deleted: &mut [bool],
ctx: Option<&QueryContext>,
) {
let ctx = match ctx {
Some(c) => c,
None => return,
};
for pending_l0_arc in ctx.pending_flush_l0s.iter() {
let pending_l0 = pending_l0_arc.read();
overlay_vertex_from_l0(&pending_l0, vid_to_idx, result, deleted);
}
{
let l0 = ctx.l0.read();
overlay_vertex_from_l0(&l0, vid_to_idx, result, deleted);
}
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
overlay_vertex_from_l0(&tx_l0, vid_to_idx, result, deleted);
}
}
fn overlay_vertex_from_l0(
l0: &L0Buffer,
vid_to_idx: &HashMap<Vid, usize>,
result: &mut [Properties],
deleted: &mut [bool],
) {
for vid in &l0.vertex_tombstones {
if let Some(&idx) = vid_to_idx.get(vid) {
deleted[idx] = true;
}
}
for (vid, props) in &l0.vertex_properties {
if let Some(&idx) = vid_to_idx.get(vid) {
for (k, v) in props {
result[idx].insert(k.clone(), v.clone());
}
}
}
}
pub fn overlay_edge_batch(
eid_to_idx: &HashMap<Eid, usize>,
result: &mut [Properties],
deleted: &mut [bool],
ctx: Option<&QueryContext>,
) {
let ctx = match ctx {
Some(c) => c,
None => return,
};
for pending_l0_arc in ctx.pending_flush_l0s.iter() {
let pending_l0 = pending_l0_arc.read();
overlay_edge_from_l0(&pending_l0, eid_to_idx, result, deleted);
}
{
let l0 = ctx.l0.read();
overlay_edge_from_l0(&l0, eid_to_idx, result, deleted);
}
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
overlay_edge_from_l0(&tx_l0, eid_to_idx, result, deleted);
}
}
pub fn vertex_exists_in_l0(vid: Vid, ctx: Option<&QueryContext>) -> bool {
let ctx = match ctx {
Some(c) => c,
None => return false,
};
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if tx_l0.vertex_properties.contains_key(&vid) {
return true;
}
}
{
let l0 = ctx.l0.read();
if l0.vertex_properties.contains_key(&vid) {
return true;
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter() {
let pending_l0 = pending_l0_arc.read();
if pending_l0.vertex_properties.contains_key(&vid) {
return true;
}
}
false
}
pub fn get_vertex_labels(vid: Vid, ctx: &QueryContext) -> Vec<String> {
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(labels) = tx_l0.get_vertex_labels(vid) {
return labels.to_vec();
}
}
{
let l0 = ctx.l0.read();
if let Some(labels) = l0.get_vertex_labels(vid) {
return labels.to_vec();
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if let Some(labels) = pending_l0.get_vertex_labels(vid) {
return labels.to_vec();
}
}
Vec::new()
}
pub fn get_vertex_labels_optional(vid: Vid, ctx: &QueryContext) -> Option<Vec<String>> {
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(labels) = tx_l0.get_vertex_labels(vid) {
return Some(labels.to_vec());
}
}
{
let l0 = ctx.l0.read();
if let Some(labels) = l0.get_vertex_labels(vid) {
return Some(labels.to_vec());
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if let Some(labels) = pending_l0.get_vertex_labels(vid) {
return Some(labels.to_vec());
}
}
None
}
pub fn get_edge_type(eid: Eid, ctx: &QueryContext) -> Option<String> {
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(edge_type) = tx_l0.get_edge_type(eid) {
return Some(edge_type.to_string());
}
}
{
let l0 = ctx.l0.read();
if let Some(edge_type) = l0.get_edge_type(eid) {
return Some(edge_type.to_string());
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if let Some(edge_type) = pending_l0.get_edge_type(eid) {
return Some(edge_type.to_string());
}
}
None
}
pub fn get_vertex_properties(vid: Vid, ctx: &QueryContext) -> Option<uni_common::Properties> {
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(props) = tx_l0.vertex_properties.get(&vid) {
return Some(props.clone());
}
}
{
let l0 = ctx.l0.read();
if let Some(props) = l0.vertex_properties.get(&vid) {
return Some(props.clone());
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if let Some(props) = pending_l0.vertex_properties.get(&vid) {
return Some(props.clone());
}
}
None
}
pub fn get_edge_properties(eid: Eid, ctx: &QueryContext) -> Option<uni_common::Properties> {
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if let Some(props) = tx_l0.edge_properties.get(&eid) {
return Some(props.clone());
}
}
{
let l0 = ctx.l0.read();
if let Some(props) = l0.edge_properties.get(&eid) {
return Some(props.clone());
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter().rev() {
let pending_l0 = pending_l0_arc.read();
if let Some(props) = pending_l0.edge_properties.get(&eid) {
return Some(props.clone());
}
}
None
}
pub fn edge_exists_in_l0(eid: Eid, ctx: Option<&QueryContext>) -> bool {
let ctx = match ctx {
Some(c) => c,
None => return false,
};
if let Some(tx_l0_arc) = &ctx.transaction_l0 {
let tx_l0 = tx_l0_arc.read();
if tx_l0.edge_endpoints.contains_key(&eid) {
return true;
}
}
{
let l0 = ctx.l0.read();
if l0.edge_endpoints.contains_key(&eid) {
return true;
}
}
for pending_l0_arc in ctx.pending_flush_l0s.iter() {
let pending_l0 = pending_l0_arc.read();
if pending_l0.edge_endpoints.contains_key(&eid) {
return true;
}
}
false
}
fn overlay_edge_from_l0(
l0: &L0Buffer,
eid_to_idx: &HashMap<Eid, usize>,
result: &mut [Properties],
deleted: &mut [bool],
) {
for eid in l0.tombstones.keys() {
if let Some(&idx) = eid_to_idx.get(eid) {
deleted[idx] = true;
}
}
for (eid, props) in &l0.edge_properties {
if let Some(&idx) = eid_to_idx.get(eid) {
for (k, v) in props {
result[idx].insert(k.clone(), v.clone());
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::l0::L0Buffer;
use parking_lot::RwLock;
use std::sync::Arc;
use uni_common::Value;
fn make_ctx_with_l0(l0: L0Buffer) -> QueryContext {
QueryContext::new(Arc::new(RwLock::new(l0)))
}
#[test]
fn test_is_vertex_deleted_empty_ctx() {
assert!(!is_vertex_deleted(Vid::from(1), None));
}
#[test]
fn test_is_vertex_deleted_in_main_l0() {
let mut l0 = L0Buffer::new(0, None);
l0.vertex_tombstones.insert(Vid::from(1));
let ctx = make_ctx_with_l0(l0);
assert!(is_vertex_deleted(Vid::from(1), Some(&ctx)));
assert!(!is_vertex_deleted(Vid::from(2), Some(&ctx)));
}
#[test]
fn test_lookup_vertex_prop_in_main_l0() {
let mut l0 = L0Buffer::new(0, None);
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Alice".to_string()));
l0.vertex_properties.insert(Vid::from(1), props);
let ctx = make_ctx_with_l0(l0);
let result = lookup_vertex_prop(Vid::from(1), "name", Some(&ctx));
assert_eq!(result, Some(Value::String("Alice".to_string())));
let result = lookup_vertex_prop(Vid::from(1), "age", Some(&ctx));
assert_eq!(result, None);
}
#[test]
fn test_accumulate_vertex_props() {
let mut l0 = L0Buffer::new(0, None);
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Alice".to_string()));
props.insert("age".to_string(), Value::Int(30));
l0.vertex_properties.insert(Vid::from(1), props);
let ctx = make_ctx_with_l0(l0);
let result = accumulate_vertex_props(Vid::from(1), Some(&ctx));
assert!(result.is_some());
let props = result.unwrap();
assert_eq!(props.get("name"), Some(&Value::String("Alice".to_string())));
assert_eq!(props.get("age"), Some(&Value::Int(30)));
}
#[test]
fn test_transaction_l0_takes_precedence() {
let mut main_l0 = L0Buffer::new(0, None);
let mut main_props = HashMap::new();
main_props.insert("name".to_string(), Value::String("Alice".to_string()));
main_l0.vertex_properties.insert(Vid::from(1), main_props);
let mut tx_l0 = L0Buffer::new(0, None);
let mut tx_props = HashMap::new();
tx_props.insert("name".to_string(), Value::String("Bob".to_string()));
tx_l0.vertex_properties.insert(Vid::from(1), tx_props);
let ctx = QueryContext::new_with_tx(
Arc::new(RwLock::new(main_l0)),
Some(Arc::new(RwLock::new(tx_l0))),
);
let result = lookup_vertex_prop(Vid::from(1), "name", Some(&ctx));
assert_eq!(result, Some(Value::String("Bob".to_string())));
let all_props = accumulate_vertex_props(Vid::from(1), Some(&ctx));
assert_eq!(
all_props.unwrap().get("name"),
Some(&Value::String("Bob".to_string()))
);
}
}