use std::{
collections::HashSet,
sync::{atomic::AtomicUsize, Barrier},
thread,
};
use tugraph::{
cursor::{EdgeCursor, VertexCursor},
field::{FieldData, FieldSpec, FieldType},
txn::{TxnRead, TxnWrite},
};
mod common;
#[test]
fn test_concurrent_vertex_add() {
let galaxy = common::open_galaxy_in_tmpdir().unwrap();
let graph = galaxy.open_graph("default", false).unwrap();
let added = graph
.add_vertex_label(
"v",
&[FieldSpec {
name: "id".into(),
ty: FieldType::Int64,
optional: false,
}],
"id",
)
.unwrap();
assert!(added);
const NTHREADS: usize = 3;
let (barrier, n_success, n_fail) = (
Barrier::new(NTHREADS),
AtomicUsize::new(0),
AtomicUsize::new(0),
);
thread::scope(|s| {
let jhs: Vec<_> = (0..NTHREADS)
.map(|id| {
let (graph, barrier, n_success, n_fail) = (&graph, &barrier, &n_success, &n_fail);
s.spawn(move || {
let mut rw_txn = graph.create_rw_txn(true).unwrap();
rw_txn
.add_vertex("v", &["id"], &[FieldData::Int64(id as i64)])
.expect("add vertex to optimistic txn before commiting should be ok");
barrier.wait();
if rw_txn.commit().is_ok() {
n_success.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
n_fail.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
})
})
.collect();
jhs.into_iter().for_each(|j| j.join().unwrap());
});
let ro_txn = graph.create_ro_txn().unwrap();
assert_eq!(n_success.load(std::sync::atomic::Ordering::Relaxed), 1);
assert_eq!(
n_fail.load(std::sync::atomic::Ordering::Relaxed),
NTHREADS - 1
);
assert_eq!(ro_txn.num_vertices().unwrap(), 1);
}
#[test]
fn test_rw_txn_co_existence() {
let galaxy = common::open_galaxy_in_tmpdir().unwrap();
let graph = galaxy.open_graph("default", false).unwrap();
let num_rw_txn = AtomicUsize::new(0);
thread::scope(|s| {
(0..10).for_each(|_| {
s.spawn(|| {
let _rw_txn = graph.create_rw_txn(false);
assert_eq!(
num_rw_txn.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
0
);
num_rw_txn.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
});
})
});
thread::scope(|s| {
(0..10).for_each(|_| {
s.spawn(|| {
let rw_txn = graph.create_rw_txn(true);
assert!(rw_txn.is_ok());
});
})
});
}
#[test]
fn test_ro_txn_co_existence() {
let galaxy = common::open_galaxy_in_tmpdir().unwrap();
let graph = galaxy.open_graph("default", true).unwrap();
thread::scope(|s| {
(0..10).for_each(|_| {
s.spawn(|| {
let ro_txn = graph.create_ro_txn();
assert!(ro_txn.is_ok());
});
})
});
}
#[test]
fn test_txn_nest() {
let galaxy = common::open_galaxy_in_tmpdir().unwrap();
let graph = galaxy.open_graph("default", false).unwrap();
{
let _ro_txn = graph.create_ro_txn().unwrap();
assert!(graph.create_ro_txn().is_err());
assert!(graph.create_rw_txn(false).is_err());
}
{
let _rw_txn = graph.create_rw_txn(false).unwrap();
assert!(graph.create_ro_txn().is_err());
assert!(graph.create_rw_txn(false).is_err());
}
{
let _rw_txn = graph.create_rw_txn(false).unwrap();
assert!(graph.create_ro_txn().is_err());
assert!(graph.create_rw_txn(true).is_err());
}
{
let _rw_txn = graph.create_rw_txn(true).unwrap();
assert!(graph.create_ro_txn().is_err());
assert!(graph.create_rw_txn(false).is_err());
}
{
let _rw_txn = graph.create_rw_txn(true).unwrap();
assert!(graph.create_ro_txn().is_err());
assert!(graph.create_rw_txn(true).is_err());
}
}
#[test]
fn test_txn_readwrite() {
let galaxy = common::open_galaxy_in_tmpdir().unwrap();
let graph = galaxy.open_graph("default", false).unwrap();
{
let ro_txn = graph.create_ro_txn().unwrap();
assert!(ro_txn.is_valid());
assert!(ro_txn.is_read_only());
assert_eq!(ro_txn.num_edge_labels().unwrap(), 0);
assert_eq!(ro_txn.num_vertex_labels().unwrap(), 0);
assert!(ro_txn.all_vertex_labels().unwrap().is_empty());
assert!(ro_txn.all_edge_labels().unwrap().is_empty());
} assert!(graph
.add_vertex_label(
"src",
&[FieldSpec {
name: "id".into(),
ty: FieldType::String,
optional: false,
}],
"id",
)
.unwrap());
assert!(graph
.add_vertex_label(
"dst",
&[FieldSpec {
name: "id".into(),
ty: FieldType::String,
optional: false,
}],
"id",
)
.unwrap());
assert!(graph
.add_edge_label(
"edge",
&[FieldSpec {
name: "tid".into(),
ty: FieldType::String,
optional: false,
}],
"",
[("src", "dst")],
)
.unwrap());
const NUM_ADDED_BY_NAME: usize = 4;
const NUM_ADDED_BY_ID: usize = 4;
let all_edges = {
let mut rw_txn = graph.create_rw_txn(false).unwrap();
assert!(rw_txn.is_valid());
assert!(!rw_txn.is_read_only());
let mut edges = vec![];
(0..NUM_ADDED_BY_NAME).for_each(|i| {
let src = rw_txn
.add_vertex("src", &["id"], &[FieldData::String(format!("src_{i}"))])
.unwrap();
let dst = rw_txn
.add_vertex("dst", &["id"], &[FieldData::String(format!("dst_{i}"))])
.unwrap();
let euid = rw_txn
.add_edge(
src,
dst,
"edge",
&["tid"],
&[FieldData::String(format!("edge_{src}_{dst}"))],
)
.unwrap();
assert!(euid.src == src && euid.dst == dst);
edges.push((src, dst));
});
let src_lid = rw_txn.vertex_label_id("src").unwrap();
let src_id_fid = rw_txn.vertex_field_id(src_lid, "id").unwrap();
let dst_lid = rw_txn.vertex_label_id("dst").unwrap();
let dst_id_fid = rw_txn.vertex_field_id(dst_lid, "id").unwrap();
let edge_lid = rw_txn.edge_label_id("edge").unwrap();
let edge_tid_fid = rw_txn.edge_field_id(edge_lid, "tid").unwrap();
(0..NUM_ADDED_BY_ID).map(|i| i + 100).for_each(|i| {
let src = rw_txn
.add_vertex_by_id(
src_lid,
&[src_id_fid],
&[FieldData::String(format!("src_{i}"))],
)
.unwrap();
let dst = rw_txn
.add_vertex_by_id(
dst_lid,
&[dst_id_fid],
&[FieldData::String(format!("dst_{i}"))],
)
.unwrap();
let euid = rw_txn
.add_edge_by_id(
src,
dst,
edge_lid,
&[edge_tid_fid],
&[FieldData::String(format!("edge_{src}_{dst}"))],
)
.unwrap();
assert!(euid.src == src && euid.dst == dst);
edges.push((src, dst));
});
rw_txn.commit().unwrap();
edges
};
{
let ro_txn = graph.create_ro_txn().unwrap();
assert_eq!(ro_txn.num_edge_labels().unwrap(), 1);
assert_eq!(ro_txn.num_vertex_labels().unwrap(), 2);
assert_eq!(
ro_txn
.all_vertex_labels()
.unwrap()
.into_iter()
.collect::<HashSet<String>>(),
HashSet::from_iter(["src".into(), "dst".into()])
);
assert_eq!(ro_txn.all_edge_labels().unwrap(), vec!["edge"]);
let src_fields = ro_txn.vertex_schema("src").unwrap();
assert_eq!(src_fields.len(), 1);
assert_eq!(src_fields.first().unwrap().name, "id");
let edge_fields = ro_txn.edge_schema("edge").unwrap();
assert_eq!(edge_fields.len(), 1);
assert_eq!(edge_fields.first().unwrap().name, "tid");
let all_vertex_fields: HashSet<_> = ro_txn
.vertex_cur()
.unwrap()
.into_vertex_fields()
.map(|fd| {
assert_eq!(fd.len(), 1);
match fd.first().unwrap() {
FieldData::String(v) => v.clone(),
_ => panic!("field data should be string type"),
}
})
.collect();
assert_eq!(
all_vertex_fields,
HashSet::from_iter(
(0..NUM_ADDED_BY_ID)
.chain((0..NUM_ADDED_BY_NAME).map(|i| i + 100))
.flat_map(|id| [format!("src_{id}"), format!("dst_{id}")])
)
);
let all_edge_fields: HashSet<_> = all_edges
.iter()
.flat_map(|e| {
ro_txn
.vertex_cur()
.unwrap()
.seek(e.0, false)
.unwrap()
.out_edge_cursor()
.unwrap()
.into_edge_fields()
.map(|fd| {
assert_eq!(fd.len(), 1);
match fd.first().unwrap() {
FieldData::String(v) => v.clone(),
_ => panic!("field data should be string type"),
}
})
.collect::<Vec<_>>()
})
.collect();
assert_eq!(
all_edge_fields,
all_edges
.iter()
.map(|e| format!("edge_{}_{}", e.0, e.1))
.collect()
);
} }