use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use async_trait::async_trait;
use boltr::error::BoltError;
use boltr::server::{
AuthInfo, BoltBackend, BoltRecord, ResultMetadata, ResultStream, RoutingTable, SessionConfig,
SessionHandle, SessionProperty, TransactionHandle,
};
use boltr::types::{BoltDict, BoltValue};
use kglite::api::{cypher, DirGraph, Value};
use crate::error_map::kg_to_bolt;
use crate::value_adapter;
pub struct KgliteBackend {
session: Arc<kglite::api::session::Session>,
readonly: bool,
transactions: Arc<Mutex<HashMap<String, Arc<Mutex<TxState>>>>>,
session_counter: AtomicU64,
tx_counter: AtomicU64,
advertised_addr: String,
}
struct TxState {
inner: Option<kglite::api::session::Transaction>,
session_id: String,
}
impl KgliteBackend {
pub fn new(graph: DirGraph, readonly: bool, advertised_addr: String) -> Self {
Self {
session: Arc::new(kglite::api::session::Session::new(graph)),
readonly,
transactions: Arc::new(Mutex::new(HashMap::new())),
session_counter: AtomicU64::new(0),
tx_counter: AtomicU64::new(0),
advertised_addr,
}
}
}
#[async_trait]
impl BoltBackend for KgliteBackend {
async fn create_session(&self, config: &SessionConfig) -> Result<SessionHandle, BoltError> {
let id = self.session_counter.fetch_add(1, Ordering::Relaxed);
let handle = SessionHandle(format!("bolt-{id}"));
tracing::debug!(
session_id = %handle.0,
user_agent = %config.user_agent,
database = ?config.database,
"create_session"
);
Ok(handle)
}
async fn set_session_auth(
&self,
session: &SessionHandle,
auth_info: AuthInfo,
) -> Result<(), BoltError> {
tracing::debug!(
session_id = %session.0,
principal = %auth_info.principal,
"set_session_auth (no-op until C.6)"
);
Ok(())
}
async fn close_session(&self, session: &SessionHandle) -> Result<(), BoltError> {
let to_drop: Vec<String> = {
let txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
txs.iter()
.filter_map(|(handle, state_arc)| {
let state = state_arc.lock().unwrap_or_else(|p| p.into_inner());
(state.session_id == session.0).then(|| handle.clone())
})
.collect()
};
{
let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
for handle in &to_drop {
txs.remove(handle);
tracing::debug!(
session_id = %session.0,
tx = %handle,
"rolled back in-flight transaction on session close"
);
}
}
tracing::debug!(
session_id = %session.0,
rolled_back = to_drop.len(),
"close_session"
);
Ok(())
}
async fn configure_session(
&self,
session: &SessionHandle,
property: SessionProperty,
) -> Result<(), BoltError> {
match property {
SessionProperty::Database(db) => {
tracing::debug!(
session_id = %session.0,
database = %db,
"configure_session: database property accepted but ignored (single-graph server)"
);
}
}
Ok(())
}
async fn reset_session(&self, session: &SessionHandle) -> Result<(), BoltError> {
let to_drop: Vec<String> = {
let txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
txs.iter()
.filter_map(|(handle, state_arc)| {
let state = state_arc.lock().unwrap_or_else(|p| p.into_inner());
(state.session_id == session.0).then(|| handle.clone())
})
.collect()
};
{
let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
for handle in &to_drop {
txs.remove(handle);
}
}
tracing::debug!(
session_id = %session.0,
rolled_back = to_drop.len(),
"reset_session"
);
Ok(())
}
async fn execute(
&self,
_session: &SessionHandle,
query: &str,
parameters: &HashMap<String, BoltValue>,
_extra: &BoltDict,
transaction: Option<&TransactionHandle>,
) -> Result<ResultStream, BoltError> {
let trimmed = query.trim();
if trimmed.is_empty() {
return Err(BoltError::Protocol(
"empty Cypher query — RUN requires a non-empty statement".into(),
));
}
if _query_appears_multi_statement(trimmed) {
return Err(BoltError::Protocol(
"multi-statement queries not supported — send one Cypher \
statement per RUN message (or open a transaction and \
issue separate RUNs)"
.into(),
));
}
let kg_params: HashMap<String, Value> = parameters
.iter()
.map(|(k, v)| value_adapter::from_bolt(v).map(|kv| (k.clone(), kv)))
.collect::<Result<HashMap<_, _>, _>>()?;
let elapsed_start = Instant::now();
let (result, type_str) = if let Some(handle) = transaction.map(|t| t.0.clone()) {
self.execute_in_tx(&handle, query, kg_params)?
} else {
self.execute_auto_commit(query, kg_params)?
};
let elapsed_ms = elapsed_start.elapsed().as_millis() as i64;
let records: Vec<BoltRecord> = result
.rows
.iter()
.map(|row| {
row.iter()
.map(value_adapter::to_bolt)
.collect::<Result<Vec<_>, _>>()
.map(|values| BoltRecord { values })
})
.collect::<Result<Vec<_>, _>>()?;
let mut summary = BoltDict::from([
("type".to_string(), BoltValue::String(type_str.to_string())),
("t_last".to_string(), BoltValue::Integer(elapsed_ms)),
]);
if let Some(stats) = &result.stats {
let stats_dict = BoltDict::from([
(
"nodes-created".to_string(),
BoltValue::Integer(stats.nodes_created as i64),
),
(
"nodes-deleted".to_string(),
BoltValue::Integer(stats.nodes_deleted as i64),
),
(
"relationships-created".to_string(),
BoltValue::Integer(stats.relationships_created as i64),
),
(
"relationships-deleted".to_string(),
BoltValue::Integer(stats.relationships_deleted as i64),
),
(
"properties-set".to_string(),
BoltValue::Integer(stats.properties_set as i64),
),
]);
summary.insert("stats".to_string(), BoltValue::Dict(stats_dict));
}
Ok(ResultStream {
metadata: ResultMetadata {
columns: result.columns,
extra: BoltDict::new(),
},
records,
summary,
})
}
async fn begin_transaction(
&self,
session: &SessionHandle,
_extra: &BoltDict,
) -> Result<TransactionHandle, BoltError> {
if self.readonly {
return Err(BoltError::Forbidden(
"server is read-only — explicit transactions rejected (--readonly flag)".into(),
));
}
let id = self.tx_counter.fetch_add(1, Ordering::Relaxed);
let handle = TransactionHandle(format!("tx-{id}"));
let state = TxState {
inner: Some(self.session.begin()),
session_id: session.0.clone(),
};
{
let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
txs.insert(handle.0.clone(), Arc::new(Mutex::new(state)));
}
tracing::debug!(
session_id = %session.0,
tx = %handle.0,
"begin_transaction"
);
Ok(handle)
}
async fn commit(
&self,
session: &SessionHandle,
transaction: &TransactionHandle,
) -> Result<BoltDict, BoltError> {
let state_arc = {
let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
txs.remove(&transaction.0).ok_or_else(|| {
BoltError::Transaction(format!(
"commit: unknown transaction handle: {}",
transaction.0
))
})?
};
let mut state = match Arc::try_unwrap(state_arc) {
Ok(mutex) => mutex.into_inner().unwrap_or_else(|p| p.into_inner()),
Err(arc) => {
let guard = arc.lock().unwrap_or_else(|p| p.into_inner());
TxState {
inner: None,
session_id: guard.session_id.clone(),
}
}
};
if state.session_id != session.0 {
let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
txs.insert(transaction.0.clone(), Arc::new(Mutex::new(state)));
return Err(BoltError::Transaction(format!(
"commit: transaction {} doesn't belong to session {}",
transaction.0, session.0
)));
}
let Some(tx) = state.inner.take() else {
return Ok(BoltDict::new());
};
match self.session.commit(tx, true) {
kglite::api::session::CommitOutcome::NoWritesNoOp => {
tracing::debug!(
session_id = %session.0,
tx = %transaction.0,
"commit (no-op; no mutations)"
);
}
kglite::api::session::CommitOutcome::Committed { new_version } => {
tracing::debug!(
session_id = %session.0,
tx = %transaction.0,
new_version,
"commit (with mutations)"
);
}
kglite::api::session::CommitOutcome::ConflictDetected {
current_version,
base_version,
} => {
tracing::debug!(
session_id = %session.0,
tx = %transaction.0,
current_version,
base_version,
"commit conflict — another writer committed first"
);
return Err(BoltError::Transaction(format!(
"Transaction conflict: graph was modified by another committer \
since this transaction's BEGIN (base version {base_version}, \
current version {current_version}). Retry the transaction."
)));
}
}
Ok(BoltDict::new())
}
async fn rollback(
&self,
session: &SessionHandle,
transaction: &TransactionHandle,
) -> Result<(), BoltError> {
let state_arc = {
let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
txs.remove(&transaction.0).ok_or_else(|| {
BoltError::Transaction(format!(
"rollback: unknown transaction handle: {}",
transaction.0
))
})?
};
let (session_id, had_mutations) = {
let state = state_arc.lock().unwrap_or_else(|p| p.into_inner());
(
state.session_id.clone(),
state.inner.as_ref().is_some_and(|t| t.has_writes()),
)
};
if session_id != session.0 {
let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
txs.insert(transaction.0.clone(), state_arc);
return Err(BoltError::Transaction(format!(
"rollback: transaction {} doesn't belong to session {}",
transaction.0, session.0
)));
}
if let Ok(mutex) = Arc::try_unwrap(state_arc) {
if let Ok(mut state) = mutex.into_inner() {
if let Some(tx) = state.inner.take() {
self.session.rollback(tx);
}
}
}
tracing::debug!(
session_id = %session.0,
tx = %transaction.0,
had_mutations = had_mutations,
"rollback"
);
Ok(())
}
async fn get_server_info(&self) -> Result<BoltDict, BoltError> {
let version = env!("CARGO_PKG_VERSION");
let product = format!("kglite-bolt-server/{version}");
let bolt_agent = BoltDict::from([
("product".to_string(), BoltValue::String(product.clone())),
(
"version".to_string(),
BoltValue::String(version.to_string()),
),
]);
let info = BoltDict::from([
("server".to_string(), BoltValue::String(product)),
("bolt_agent".to_string(), BoltValue::Dict(bolt_agent)),
]);
Ok(info)
}
async fn route(
&self,
_routing_context: &BoltDict,
_bookmarks: &[String],
db: Option<&str>,
) -> Result<RoutingTable, BoltError> {
let db_name = db.unwrap_or("neo4j").to_string();
let ttl = 300;
let single_server = boltr::server::RoutingServer {
addresses: vec![self.advertised_addr.clone()],
role: String::new(), };
let mut servers = Vec::with_capacity(3);
for role in ["WRITE", "READ", "ROUTE"] {
servers.push(boltr::server::RoutingServer {
addresses: single_server.addresses.clone(),
role: role.to_string(),
});
}
Ok(RoutingTable {
ttl,
db: db_name,
servers,
})
}
}
fn _query_appears_multi_statement(query: &str) -> bool {
let mut in_quote: Option<char> = None;
let mut chars = query.chars().peekable();
while let Some(c) = chars.next() {
match (c, in_quote) {
('\\', Some(_)) => {
let _ = chars.next();
}
('\'', None) => in_quote = Some('\''),
('"', None) => in_quote = Some('"'),
(c, Some(q)) if c == q => in_quote = None,
(';', None) => {
let rest: String = chars.collect();
if !rest.trim().is_empty() {
return true;
}
return false;
}
_ => {}
}
}
false
}
impl KgliteBackend {
fn execute_opts<'a>(
&self,
kg_params: &'a HashMap<String, Value>,
) -> kglite::api::session::ExecuteOptions<'a> {
kglite::api::session::ExecuteOptions {
params: kg_params,
deadline: None,
max_rows: None,
lazy_eligible: false,
disabled_passes: None,
embedder: None, }
}
fn execute_auto_commit(
&self,
query: &str,
kg_params: HashMap<String, Value>,
) -> Result<(cypher::CypherResult, &'static str), BoltError> {
let pre_parsed = cypher::parse_cypher(query).map_err(kg_to_bolt)?;
if cypher::is_mutation_query(&pre_parsed) {
if self.readonly {
return Err(BoltError::Forbidden(
"server is read-only — mutations rejected (--readonly flag)".into(),
));
}
return Err(BoltError::Backend(
"auto-commit mutations not supported by kglite-bolt-server — \
wrap CREATE/SET/DELETE in an explicit transaction \
(session.begin_transaction)"
.into(),
));
}
let snapshot = self.session.snapshot();
let opts = self.execute_opts(&kg_params);
let outcome =
kglite::api::session::execute_read(&snapshot, query, &opts).map_err(kg_to_bolt)?;
Ok((outcome.result, "r"))
}
fn execute_in_tx(
&self,
handle: &str,
query: &str,
kg_params: HashMap<String, Value>,
) -> Result<(cypher::CypherResult, &'static str), BoltError> {
let state_arc: Arc<Mutex<TxState>> = {
let txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
txs.get(handle)
.ok_or_else(|| {
BoltError::Transaction(format!("unknown transaction handle: {handle}"))
})
.map(Arc::clone)?
};
let mut state = state_arc.lock().unwrap_or_else(|p| p.into_inner());
let tx_inner = state.inner.as_mut().ok_or_else(|| {
BoltError::Transaction(format!("tx {handle} already committed or rolled back"))
})?;
let pre_parsed = cypher::parse_cypher(query).map_err(kg_to_bolt)?;
let is_mutation = cypher::is_mutation_query(&pre_parsed);
if is_mutation && self.readonly {
return Err(BoltError::Forbidden(
"server is read-only — mutations rejected (--readonly flag)".into(),
));
}
let opts = self.execute_opts(&kg_params);
if is_mutation {
let working = tx_inner.working_mut().map_err(kg_to_bolt)?;
let outcome =
kglite::api::session::execute_mut(working, query, &opts).map_err(kg_to_bolt)?;
Ok((outcome.result, "w"))
} else {
let graph = tx_inner.current().ok_or_else(|| {
BoltError::Backend(format!(
"tx {handle} lost its graph view mid-read — bolt-server internal bug"
))
})?;
let outcome =
kglite::api::session::execute_read(graph, query, &opts).map_err(kg_to_bolt)?;
Ok((outcome.result, "r"))
}
}
}