use crate::api::Uni;
use crate::api::locy_builder::{LocyBuilder, TxLocyBuilder};
use crate::api::session::{QueryBuilder, Session, TransactionBuilder};
use crate::api::transaction::{
ApplyBuilder, ApplyResult, CommitResult, Transaction, TxQueryBuilder,
};
use std::sync::Arc;
use uni_common::core::schema::{DataType, Schema};
use uni_common::{Result, UniError, Value};
use uni_locy::DerivedFactSet;
use crate::api::locy_result::LocyResult;
use uni_query::{ExecuteResult, QueryResult, Row};
pub struct UniSync {
inner: Option<Uni>,
rt: tokio::runtime::Runtime,
}
impl UniSync {
pub fn new(inner: Uni) -> Result<Self> {
let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
Ok(Self {
inner: Some(inner),
rt,
})
}
pub fn in_memory() -> Result<Self> {
let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
let inner = rt.block_on(Uni::in_memory().build())?;
Ok(Self {
inner: Some(inner),
rt,
})
}
fn inner(&self) -> &Uni {
self.inner.as_ref().expect("UniSync already shut down")
}
pub fn session(&self) -> SessionSync<'_> {
SessionSync {
session: self.inner().session(),
rt: &self.rt,
}
}
pub fn schema_meta(&self) -> Arc<Schema> {
self.inner().schema().current()
}
pub fn schema(&self) -> SchemaBuilderSync<'_> {
SchemaBuilderSync {
inner: self.inner().schema(),
rt: &self.rt,
}
}
pub fn shutdown(mut self) -> Result<()> {
if let Some(uni) = self.inner.take() {
let result = self.rt.block_on(uni.shutdown());
std::mem::forget(self);
result
} else {
Ok(()) }
}
}
impl Drop for UniSync {
fn drop(&mut self) {
if let Some(ref uni) = self.inner {
uni.inner.shutdown_handle.shutdown_blocking();
tracing::debug!("UniSync dropped");
}
}
}
pub struct SessionSync<'a> {
session: Session,
rt: &'a tokio::runtime::Runtime,
}
impl<'a> SessionSync<'a> {
pub fn query(&self, cypher: &str) -> Result<QueryResult> {
self.rt.block_on(self.session.query(cypher))
}
pub fn query_with<'s>(&'s self, cypher: &str) -> QueryBuilderSync<'s, 'a> {
QueryBuilderSync {
inner: self.session.query_with(cypher),
rt: self.rt,
}
}
pub fn locy(&self, program: &str) -> Result<LocyResult> {
self.rt.block_on(self.session.locy(program))
}
pub fn locy_with<'s>(&'s self, program: &str) -> LocyBuilderSync<'s, 'a> {
LocyBuilderSync {
inner: self.session.locy_with(program),
rt: self.rt,
}
}
pub fn rules(&self) -> crate::api::rule_registry::RuleRegistry<'_> {
self.session.rules()
}
pub fn compile_locy(&self, program: &str) -> Result<uni_locy::CompiledProgram> {
self.session.compile_locy(program)
}
pub fn tx(&self) -> Result<TransactionSync<'a>> {
let tx = self.rt.block_on(self.session.tx())?;
Ok(TransactionSync { tx, rt: self.rt })
}
pub fn tx_with(&self) -> TransactionBuilderSync<'_, 'a> {
TransactionBuilderSync {
inner: self.session.tx_with(),
rt: self.rt,
}
}
pub fn watch(&self) -> crate::api::notifications::CommitStream {
self.session.watch()
}
pub fn watch_with(&self) -> crate::api::notifications::WatchBuilder {
self.session.watch_with()
}
pub fn add_hook(
&mut self,
name: impl Into<String>,
hook: impl crate::api::hooks::SessionHook + 'static,
) {
self.session.add_hook(name, hook)
}
pub fn remove_hook(&mut self, name: &str) -> bool {
self.session.remove_hook(name)
}
pub fn list_hooks(&self) -> Vec<String> {
self.session.list_hooks()
}
pub fn clear_hooks(&mut self) {
self.session.clear_hooks()
}
pub fn pin_to_version(&mut self, snapshot_id: &str) -> Result<()> {
self.rt.block_on(self.session.pin_to_version(snapshot_id))
}
pub fn pin_to_timestamp(&mut self, ts: chrono::DateTime<chrono::Utc>) -> Result<()> {
self.rt.block_on(self.session.pin_to_timestamp(ts))
}
pub fn refresh(&mut self) -> Result<()> {
self.rt.block_on(self.session.refresh())
}
pub fn prepare(&self, cypher: &str) -> Result<crate::api::prepared::PreparedQuery> {
self.rt.block_on(self.session.prepare(cypher))
}
pub fn prepare_locy(&self, program: &str) -> Result<crate::api::prepared::PreparedLocy> {
self.rt.block_on(self.session.prepare_locy(program))
}
pub fn params(&self) -> crate::api::session::Params<'_> {
self.session.params()
}
pub fn id(&self) -> &str {
self.session.id()
}
pub fn capabilities(&self) -> crate::api::session::SessionCapabilities {
self.session.capabilities()
}
pub fn metrics(&self) -> crate::api::session::SessionMetrics {
self.session.metrics()
}
pub fn cancel(&self) {
self.session.cancel()
}
}
pub struct QueryBuilderSync<'s, 'a> {
inner: QueryBuilder<'s>,
rt: &'a tokio::runtime::Runtime,
}
impl<'s, 'a> QueryBuilderSync<'s, 'a> {
pub fn param<K: Into<String>, V: Into<Value>>(mut self, key: K, value: V) -> Self {
self.inner = self.inner.param(key, value);
self
}
pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
self.inner = self.inner.params(params);
self
}
pub fn timeout(mut self, duration: std::time::Duration) -> Self {
self.inner = self.inner.timeout(duration);
self
}
pub fn max_memory(mut self, bytes: usize) -> Self {
self.inner = self.inner.max_memory(bytes);
self
}
pub fn fetch_all(self) -> Result<QueryResult> {
self.rt.block_on(self.inner.fetch_all())
}
pub fn fetch_one(self) -> Result<Option<Row>> {
self.rt.block_on(self.inner.fetch_one())
}
}
pub struct LocyBuilderSync<'s, 'a> {
inner: LocyBuilder<'s>,
rt: &'a tokio::runtime::Runtime,
}
impl<'s, 'a> LocyBuilderSync<'s, 'a> {
pub fn param(mut self, name: &str, value: impl Into<Value>) -> Self {
self.inner = self.inner.param(name, value);
self
}
pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
self.inner = self.inner.params(params);
self
}
pub fn timeout(mut self, duration: std::time::Duration) -> Self {
self.inner = self.inner.timeout(duration);
self
}
pub fn max_iterations(mut self, n: usize) -> Self {
self.inner = self.inner.max_iterations(n);
self
}
pub fn with_config(mut self, config: uni_locy::LocyConfig) -> Self {
self.inner = self.inner.with_config(config);
self
}
pub fn run(self) -> Result<LocyResult> {
self.rt.block_on(self.inner.run())
}
}
pub struct TransactionSync<'a> {
tx: Transaction,
rt: &'a tokio::runtime::Runtime,
}
impl<'a> TransactionSync<'a> {
pub fn query(&self, cypher: &str) -> Result<QueryResult> {
self.rt.block_on(self.tx.query(cypher))
}
pub fn query_with<'t>(&'t self, cypher: &str) -> TxQueryBuilderSync<'t, 'a> {
TxQueryBuilderSync {
inner: self.tx.query_with(cypher),
rt: self.rt,
}
}
pub fn execute(&self, cypher: &str) -> Result<ExecuteResult> {
self.rt.block_on(self.tx.execute(cypher))
}
pub fn execute_with<'t>(&'t self, cypher: &str) -> ExecuteBuilderSync<'t, 'a> {
ExecuteBuilderSync {
inner: self.tx.execute_with(cypher),
rt: self.rt,
}
}
pub fn locy(&self, program: &str) -> Result<LocyResult> {
self.rt.block_on(self.tx.locy(program))
}
pub fn locy_with<'t>(&'t self, program: &str) -> TxLocyBuilderSync<'t, 'a> {
TxLocyBuilderSync {
inner: self.tx.locy_with(program),
rt: self.rt,
}
}
pub fn apply(&self, derived: DerivedFactSet) -> Result<ApplyResult> {
self.rt.block_on(self.tx.apply(derived))
}
pub fn apply_with(&self, derived: DerivedFactSet) -> ApplyBuilderSync<'_, 'a> {
ApplyBuilderSync {
inner: self.tx.apply_with(derived),
rt: self.rt,
}
}
pub fn prepare(&self, cypher: &str) -> Result<crate::api::prepared::PreparedQuery> {
self.rt.block_on(self.tx.prepare(cypher))
}
pub fn prepare_locy(&self, program: &str) -> Result<crate::api::prepared::PreparedLocy> {
self.rt.block_on(self.tx.prepare_locy(program))
}
pub fn commit(self) -> Result<CommitResult> {
self.rt.block_on(self.tx.commit())
}
pub fn rollback(self) {
self.tx.rollback()
}
pub fn bulk_writer(&self) -> crate::api::bulk::BulkWriterBuilder {
self.tx.bulk_writer()
}
pub fn appender(&self, label: &str) -> crate::api::appender::AppenderBuilder {
self.tx.appender(label)
}
pub fn bulk_insert_vertices(
&self,
label: &str,
properties_list: Vec<uni_common::Properties>,
) -> Result<Vec<uni_common::core::id::Vid>> {
self.rt
.block_on(self.tx.bulk_insert_vertices(label, properties_list))
}
pub fn bulk_insert_edges(
&self,
edge_type: &str,
edges: Vec<(
uni_common::core::id::Vid,
uni_common::core::id::Vid,
uni_common::Properties,
)>,
) -> Result<()> {
self.rt
.block_on(self.tx.bulk_insert_edges(edge_type, edges))
}
pub fn is_dirty(&self) -> bool {
self.tx.is_dirty()
}
pub fn id(&self) -> &str {
self.tx.id()
}
}
pub struct ExecuteBuilderSync<'t, 'a> {
inner: crate::api::transaction::ExecuteBuilder<'t>,
rt: &'a tokio::runtime::Runtime,
}
impl<'t, 'a> ExecuteBuilderSync<'t, 'a> {
pub fn param<K: Into<String>, V: Into<Value>>(mut self, key: K, value: V) -> Self {
self.inner = self.inner.param(key, value);
self
}
pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
self.inner = self.inner.params(params);
self
}
pub fn timeout(mut self, duration: std::time::Duration) -> Self {
self.inner = self.inner.timeout(duration);
self
}
pub fn run(self) -> Result<ExecuteResult> {
self.rt.block_on(self.inner.run())
}
}
pub struct TransactionBuilderSync<'s, 'a> {
inner: TransactionBuilder<'s>,
rt: &'a tokio::runtime::Runtime,
}
impl<'s, 'a> TransactionBuilderSync<'s, 'a> {
pub fn timeout(mut self, d: std::time::Duration) -> Self {
self.inner = self.inner.timeout(d);
self
}
pub fn isolation(mut self, level: crate::api::transaction::IsolationLevel) -> Self {
self.inner = self.inner.isolation(level);
self
}
pub fn start(self) -> Result<TransactionSync<'a>> {
let tx = self.rt.block_on(self.inner.start())?;
Ok(TransactionSync { tx, rt: self.rt })
}
}
pub struct TxQueryBuilderSync<'t, 'a> {
inner: TxQueryBuilder<'t>,
rt: &'a tokio::runtime::Runtime,
}
impl<'t, 'a> TxQueryBuilderSync<'t, 'a> {
pub fn param(mut self, name: &str, value: impl Into<Value>) -> Self {
self.inner = self.inner.param(name, value);
self
}
pub fn execute(self) -> Result<ExecuteResult> {
self.rt.block_on(self.inner.execute())
}
pub fn fetch_all(self) -> Result<QueryResult> {
self.rt.block_on(self.inner.fetch_all())
}
pub fn fetch_one(self) -> Result<Option<Row>> {
self.rt.block_on(self.inner.fetch_one())
}
}
pub struct ApplyBuilderSync<'t, 'a> {
inner: ApplyBuilder<'t>,
rt: &'a tokio::runtime::Runtime,
}
impl<'t, 'a> ApplyBuilderSync<'t, 'a> {
pub fn require_fresh(mut self) -> Self {
self.inner = self.inner.require_fresh();
self
}
pub fn max_version_gap(mut self, n: u64) -> Self {
self.inner = self.inner.max_version_gap(n);
self
}
pub fn run(self) -> Result<ApplyResult> {
self.rt.block_on(self.inner.run())
}
}
pub struct TxLocyBuilderSync<'t, 'a> {
inner: TxLocyBuilder<'t>,
rt: &'a tokio::runtime::Runtime,
}
impl<'t, 'a> TxLocyBuilderSync<'t, 'a> {
pub fn param(mut self, name: &str, value: impl Into<Value>) -> Self {
self.inner = self.inner.param(name, value);
self
}
pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
self.inner = self.inner.params(params);
self
}
pub fn timeout(mut self, duration: std::time::Duration) -> Self {
self.inner = self.inner.timeout(duration);
self
}
pub fn max_iterations(mut self, n: usize) -> Self {
self.inner = self.inner.max_iterations(n);
self
}
pub fn with_config(mut self, config: uni_locy::LocyConfig) -> Self {
self.inner = self.inner.with_config(config);
self
}
pub fn run(self) -> Result<LocyResult> {
self.rt.block_on(self.inner.run())
}
}
pub struct SchemaBuilderSync<'a> {
inner: crate::api::schema::SchemaBuilder<'a>,
rt: &'a tokio::runtime::Runtime,
}
impl<'a> SchemaBuilderSync<'a> {
pub fn label(self, name: &str) -> LabelBuilderSync<'a> {
LabelBuilderSync {
inner: self.inner.label(name),
rt: self.rt,
}
}
pub fn edge_type(self, name: &str, from: &[&str], to: &[&str]) -> EdgeTypeBuilderSync<'a> {
EdgeTypeBuilderSync {
inner: self.inner.edge_type(name, from, to),
rt: self.rt,
}
}
pub fn apply(self) -> Result<()> {
self.rt.block_on(self.inner.apply())
}
}
pub struct LabelBuilderSync<'a> {
inner: crate::api::schema::LabelBuilder<'a>,
rt: &'a tokio::runtime::Runtime,
}
impl<'a> LabelBuilderSync<'a> {
pub fn property(mut self, name: &str, data_type: DataType) -> Self {
self.inner = self.inner.property(name, data_type);
self
}
pub fn property_nullable(mut self, name: &str, data_type: DataType) -> Self {
self.inner = self.inner.property_nullable(name, data_type);
self
}
pub fn vector(mut self, name: &str, dimensions: usize) -> Self {
self.inner = self.inner.vector(name, dimensions);
self
}
pub fn done(self) -> SchemaBuilderSync<'a> {
SchemaBuilderSync {
inner: self.inner.done(),
rt: self.rt,
}
}
pub fn label(self, name: &str) -> LabelBuilderSync<'a> {
self.done().label(name)
}
pub fn apply(self) -> Result<()> {
self.rt.block_on(self.inner.apply())
}
}
pub struct EdgeTypeBuilderSync<'a> {
inner: crate::api::schema::EdgeTypeBuilder<'a>,
rt: &'a tokio::runtime::Runtime,
}
impl<'a> EdgeTypeBuilderSync<'a> {
pub fn property(mut self, name: &str, data_type: DataType) -> Self {
self.inner = self.inner.property(name, data_type);
self
}
pub fn property_nullable(mut self, name: &str, data_type: DataType) -> Self {
self.inner = self.inner.property_nullable(name, data_type);
self
}
pub fn done(self) -> SchemaBuilderSync<'a> {
SchemaBuilderSync {
inner: self.inner.done(),
rt: self.rt,
}
}
pub fn apply(self) -> Result<()> {
self.rt.block_on(self.inner.apply())
}
}