pub mod auth;
#[cfg(feature = "gui")]
pub mod http;
pub mod protocol;
pub mod registry;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::OwnedMutexGuard;
use auth::{verify_password, ServerConfig};
use protocol::{row_to_json, ClientMessage, Request, Response, ServerMessage};
use registry::{GraphRegistry, GraphState};
pub async fn serve(
config: ServerConfig,
graphs_dir: PathBuf,
addr: SocketAddr,
gui_addr: Option<SocketAddr>,
) -> std::io::Result<()> {
let registry = GraphRegistry::new(graphs_dir);
let listener = TcpListener::bind(addr).await?;
eprintln!("minigdb listening on {addr} (Ctrl-C to stop)");
if let Err(e) = registry.get_or_open("default").await {
eprintln!("Warning: could not open default graph: {e}");
}
if let Err(e) = registry.get_or_open(registry::META_GRAPH).await {
eprintln!("Warning: could not open _meta graph: {e}");
}
let registry_shutdown = Arc::clone(®istry);
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
eprintln!("\nShutting down — checkpointing all graphs…");
registry_shutdown.checkpoint_all().await;
std::process::exit(0);
});
let config = Arc::new(config);
#[cfg(feature = "gui")]
if let Some(gui_addr) = gui_addr {
let reg = Arc::clone(®istry);
let cfg = Arc::clone(&config);
tokio::spawn(async move {
if let Err(e) = http::serve(gui_addr, reg, cfg).await {
eprintln!("GUI server error: {e}");
}
});
}
#[cfg(not(feature = "gui"))]
let _ = gui_addr;
loop {
let (stream, peer) = listener.accept().await?;
eprintln!("[{peer}] connected");
let registry = Arc::clone(®istry);
let config = Arc::clone(&config);
tokio::spawn(async move {
if let Err(e) = handle(stream, peer, registry, config).await {
eprintln!("[{peer}] error: {e}");
}
eprintln!("[{peer}] disconnected");
});
}
}
struct ConnectionState {
user: String,
current_graph: String,
txn_lock: Option<OwnedMutexGuard<GraphState>>,
}
async fn handle(
stream: TcpStream,
_peer: SocketAddr,
registry: Arc<GraphRegistry>,
config: Arc<ServerConfig>,
) -> std::io::Result<()> {
let (read_half, mut write_half) = stream.into_split();
let mut lines = BufReader::new(read_half).lines();
send_msg(
&mut write_half,
&ServerMessage::Hello {
version: "2",
auth_required: config.server.auth_required,
},
)
.await?;
let user = if config.server.auth_required {
loop {
let line = match lines.next_line().await? {
Some(l) => l,
None => return Ok(()), };
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<ClientMessage>(&line) {
Ok(ClientMessage::Auth { user, password }) => {
match config.find_user(&user) {
Some(entry) if verify_password(&password, &entry.password_hash) => {
send_msg(
&mut write_half,
&ServerMessage::AuthOk { user: user.clone() },
)
.await?;
break user;
}
_ => {
send_msg(
&mut write_half,
&ServerMessage::AuthFail {
error: "invalid credentials".to_string(),
},
)
.await?;
return Ok(());
}
}
}
_ => {
send_msg(
&mut write_half,
&ServerMessage::AuthFail {
error: "expected auth message".to_string(),
},
)
.await?;
return Ok(());
}
}
}
} else {
"anonymous".to_string()
};
let mut state = ConnectionState {
user,
current_graph: "default".to_string(),
txn_lock: None,
};
while let Some(line) = lines.next_line().await? {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
dispatch_line(&line, &mut state, ®istry, &config, &mut write_half).await?;
}
if let Some(mut guard) = state.txn_lock.take() {
let (graph, _) = &mut *guard;
let _ = graph.rollback_transaction();
}
Ok(())
}
async fn dispatch_line<W: AsyncWriteExt + Unpin>(
line: &str,
state: &mut ConnectionState,
registry: &Arc<GraphRegistry>,
config: &Arc<ServerConfig>,
write: &mut W,
) -> std::io::Result<()> {
let json_val: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
let resp = Response::err(0, format!("invalid JSON: {e}"), std::time::Duration::ZERO);
return send(write, &resp).await;
}
};
if json_val.get("type").is_some() {
match serde_json::from_value::<ClientMessage>(json_val) {
Ok(msg) => handle_client_message(msg, state, registry, config, write).await,
Err(e) => {
let resp =
Response::err(0, format!("invalid message: {e}"), std::time::Duration::ZERO);
send(write, &resp).await
}
}
} else {
match serde_json::from_value::<Request>(json_val) {
Ok(req) => handle_query(req, state, registry, config, write).await,
Err(e) => {
let resp =
Response::err(0, format!("invalid request: {e}"), std::time::Duration::ZERO);
send(write, &resp).await
}
}
}
}
async fn handle_client_message<W: AsyncWriteExt + Unpin>(
msg: ClientMessage,
_state: &mut ConnectionState,
registry: &Arc<GraphRegistry>,
_config: &Arc<ServerConfig>,
write: &mut W,
) -> std::io::Result<()> {
match msg {
ClientMessage::Auth { .. } => {
send_msg(write, &ServerMessage::AdminFail {
error: "already authenticated".to_string(),
})
.await
}
ClientMessage::Admin { cmd, name } => match cmd.as_str() {
"graphs" => {
let graphs = registry.list().await;
send_msg(
write,
&ServerMessage::AdminOk {
data: serde_json::json!({ "graphs": graphs }),
},
)
.await
}
"stats" => {
let open = registry.list().await;
send_msg(
write,
&ServerMessage::AdminOk {
data: serde_json::json!({ "open_graphs": open }),
},
)
.await
}
"create" => match name.as_deref() {
None => {
send_msg(
write,
&ServerMessage::AdminFail {
error: "create requires 'name'".to_string(),
},
)
.await
}
Some(n) => match registry.create(n).await {
Ok(()) => {
send_msg(write, &ServerMessage::AdminOk { data: serde_json::json!({}) })
.await
}
Err(e) => {
send_msg(
write,
&ServerMessage::AdminFail {
error: e.to_string(),
},
)
.await
}
},
},
"drop" => match name.as_deref() {
None => {
send_msg(
write,
&ServerMessage::AdminFail {
error: "drop requires 'name'".to_string(),
},
)
.await
}
Some(n) if n.starts_with('_') => {
send_msg(
write,
&ServerMessage::AdminFail {
error: format!("cannot drop system graph '{n}'"),
},
)
.await
}
Some(n) => match registry.drop_graph(n).await {
Ok(()) => {
send_msg(write, &ServerMessage::AdminOk { data: serde_json::json!({}) })
.await
}
Err(e) => {
send_msg(
write,
&ServerMessage::AdminFail {
error: e.to_string(),
},
)
.await
}
},
},
other => {
send_msg(
write,
&ServerMessage::AdminFail {
error: format!("unknown admin command '{other}'"),
},
)
.await
}
},
}
}
async fn handle_query<W: AsyncWriteExt + Unpin>(
req: Request,
state: &mut ConnectionState,
registry: &Arc<GraphRegistry>,
config: &Arc<ServerConfig>,
write: &mut W,
) -> std::io::Result<()> {
if let Some(ref g) = req.graph {
state.current_graph = g.clone();
}
if config.server.auth_required {
if let Some(entry) = config.find_user(&state.user) {
if !entry.can_access(&state.current_graph) {
let resp = Response::err(
req.id,
format!(
"user '{}' does not have access to graph '{}'",
state.user, state.current_graph
),
std::time::Duration::ZERO,
);
return send(write, &resp).await;
}
}
}
let start = Instant::now();
let id = req.id;
let upper = req.query.trim().to_uppercase();
let bare = upper.trim_end_matches(';').trim();
let resp = match bare {
"BEGIN" => {
if state.txn_lock.is_some() {
Response::err(id, "transaction already open".to_string(), start.elapsed())
} else {
match registry.get_or_open(&state.current_graph).await {
Err(e) => Response::err(id, e.to_string(), start.elapsed()),
Ok(arc) => {
let mut guard = arc.lock_owned().await;
let (graph, _) = &mut *guard;
match graph.begin_transaction() {
Ok(()) => {
state.txn_lock = Some(guard);
Response::ok(id, vec![], start.elapsed())
}
Err(e) => Response::err(id, e.to_string(), start.elapsed()),
}
}
}
}
}
"COMMIT" => {
if let Some(mut guard) = state.txn_lock.take() {
let (graph, _) = &mut *guard;
match graph.commit_transaction() {
Ok(()) => Response::ok(id, vec![], start.elapsed()),
Err(e) => {
Response::err(id, e.to_string(), start.elapsed())
}
}
} else {
Response::err(id, "no active transaction".to_string(), start.elapsed())
}
}
"ROLLBACK" => {
if let Some(mut guard) = state.txn_lock.take() {
let (graph, _) = &mut *guard;
match graph.rollback_transaction() {
Ok(()) => Response::ok(id, vec![], start.elapsed()),
Err(e) => Response::err(id, e.to_string(), start.elapsed()),
}
} else {
Response::err(id, "no active transaction".to_string(), start.elapsed())
}
}
_ => {
if let Some(ref mut guard) = state.txn_lock {
let (graph, txn_id) = &mut **guard;
execute_and_build_response(id, &req.query, graph, txn_id, start)
} else {
match registry.get_or_open(&state.current_graph).await {
Err(e) => Response::err(id, e.to_string(), start.elapsed()),
Ok(arc) => {
let mut guard = arc.lock().await;
let (graph, txn_id) = &mut *guard;
execute_and_build_response(id, &req.query, graph, txn_id, start)
}
}
}
}
};
send(write, &resp).await
}
fn execute_and_build_response(
id: u64,
query: &str,
graph: &mut crate::Graph,
txn_id: &mut u64,
start: Instant,
) -> Response {
match crate::query_capturing(query, graph, txn_id) {
Ok((rows, _ops)) => {
let json_rows: Vec<_> = rows.iter().map(row_to_json).collect();
Response::ok(id, json_rows, start.elapsed())
}
Err(e) => Response::err(id, e.to_string(), start.elapsed()),
}
}
async fn send<W: AsyncWriteExt + Unpin>(w: &mut W, resp: &Response) -> std::io::Result<()> {
let mut line = serde_json::to_string(resp).expect("Response is always serializable");
line.push('\n');
w.write_all(line.as_bytes()).await
}
async fn send_msg<W: AsyncWriteExt + Unpin>(
w: &mut W,
msg: &ServerMessage,
) -> std::io::Result<()> {
let mut line = serde_json::to_string(msg).expect("ServerMessage is always serializable");
line.push('\n');
w.write_all(line.as_bytes()).await
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{BufRead, Write};
use std::net::TcpStream;
fn start_test_server() -> std::net::SocketAddr {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
drop(listener);
let graphs_dir = tempfile::tempdir().unwrap().into_path();
let config = ServerConfig {
server: auth::ServerSection { auth_required: false },
users: vec![],
};
std::thread::spawn(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(serve(config, graphs_dir, addr, None))
.unwrap();
});
std::thread::sleep(std::time::Duration::from_millis(150));
addr
}
fn start_auth_server(users: Vec<auth::UserEntry>) -> std::net::SocketAddr {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
drop(listener);
let graphs_dir = tempfile::tempdir().unwrap().into_path();
let config = ServerConfig {
server: auth::ServerSection { auth_required: true },
users,
};
std::thread::spawn(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(serve(config, graphs_dir, addr, None))
.unwrap();
});
std::thread::sleep(std::time::Duration::from_millis(150));
addr
}
fn connect_no_auth(addr: std::net::SocketAddr) -> (TcpStream, std::io::BufReader<TcpStream>) {
let stream = TcpStream::connect(addr).unwrap();
let mut reader = std::io::BufReader::new(stream.try_clone().unwrap());
let mut hello = String::new();
reader.read_line(&mut hello).unwrap();
let hello: serde_json::Value = serde_json::from_str(hello.trim()).unwrap();
assert_eq!(hello["type"], "hello");
assert_eq!(hello["auth_required"], false);
(stream, reader)
}
fn connect_with_auth(
addr: std::net::SocketAddr,
user: &str,
password: &str,
) -> (TcpStream, std::io::BufReader<TcpStream>) {
let stream = TcpStream::connect(addr).unwrap();
let mut reader = std::io::BufReader::new(stream.try_clone().unwrap());
let mut writer = stream.try_clone().unwrap();
let mut hello = String::new();
reader.read_line(&mut hello).unwrap();
let hello: serde_json::Value = serde_json::from_str(hello.trim()).unwrap();
assert_eq!(hello["type"], "hello");
let auth = serde_json::json!({"type":"auth","user":user,"password":password});
let mut line = serde_json::to_string(&auth).unwrap();
line.push('\n');
writer.write_all(line.as_bytes()).unwrap();
let mut resp = String::new();
reader.read_line(&mut resp).unwrap();
let resp: serde_json::Value = serde_json::from_str(resp.trim()).unwrap();
(stream, reader)
}
fn roundtrip(
stream: &mut TcpStream,
reader: &mut std::io::BufReader<TcpStream>,
req: serde_json::Value,
) -> serde_json::Value {
let mut line = serde_json::to_string(&req).unwrap();
line.push('\n');
stream.write_all(line.as_bytes()).unwrap();
let mut resp = String::new();
reader.read_line(&mut resp).unwrap();
serde_json::from_str(resp.trim()).unwrap()
}
#[test]
fn server_basic_query() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let resp = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id": 1, "query": r#"INSERT (:Person {name: "Alice", age: 30})"#}),
);
assert!(resp.get("error").is_none(), "insert error: {resp}");
let resp = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id": 2, "query": "MATCH (n:Person) RETURN n.name, n.age"}),
);
let rows = resp["rows"].as_array().unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["n.name"], "Alice");
assert_eq!(rows[0]["n.age"], 30);
}
#[test]
fn server_error_response() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let resp = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id": 1, "query": "THIS IS NOT GQL %%%"}),
);
assert!(resp.get("error").is_some(), "expected error: {resp}");
}
#[test]
fn server_invalid_json() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut stream = stream;
stream.write_all(b"not json at all\n").unwrap();
let mut resp_line = String::new();
reader.read_line(&mut resp_line).unwrap();
let resp: serde_json::Value = serde_json::from_str(resp_line.trim()).unwrap();
assert!(resp.get("error").is_some());
assert_eq!(resp["id"], 0);
}
#[test]
fn server_transaction_commit() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let r = roundtrip(&mut writer, &mut reader, serde_json::json!({"id":1,"query":"BEGIN"}));
assert!(r.get("error").is_none());
roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":2,"query":r#"INSERT (:City {name:"NYC"})"#}),
);
roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":3,"query":r#"INSERT (:City {name:"LA"})"#}),
);
let r = roundtrip(&mut writer, &mut reader, serde_json::json!({"id":4,"query":"COMMIT"}));
assert!(r.get("error").is_none());
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":5,"query":"MATCH (c:City) RETURN c.name"}),
);
let rows = r["rows"].as_array().unwrap();
assert_eq!(rows.len(), 2);
}
#[test]
fn server_elapsed_ms_present() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let resp = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id": 1, "query": "MATCH (n) RETURN n"}),
);
assert!(resp["elapsed_ms"].as_f64().is_some());
}
#[test]
fn server_hello_message() {
let addr = start_test_server();
let stream = TcpStream::connect(addr).unwrap();
let mut reader = std::io::BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).unwrap();
let hello: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
assert_eq!(hello["type"], "hello");
assert_eq!(hello["version"], "2");
assert_eq!(hello["auth_required"], false);
}
#[test]
fn server_auth_required_rejects_bad_password() {
let users = vec![auth::UserEntry {
name: "alice".to_string(),
password_hash: auth::hash_password("correct"),
graphs: vec!["*".to_string()],
}];
let addr = start_auth_server(users);
let stream = TcpStream::connect(addr).unwrap();
let mut reader = std::io::BufReader::new(stream.try_clone().unwrap());
let mut writer = stream;
let mut hello = String::new();
reader.read_line(&mut hello).unwrap();
let hello: serde_json::Value = serde_json::from_str(hello.trim()).unwrap();
assert_eq!(hello["auth_required"], true);
let auth = serde_json::json!({"type":"auth","user":"alice","password":"wrong"});
let mut line = serde_json::to_string(&auth).unwrap();
line.push('\n');
writer.write_all(line.as_bytes()).unwrap();
let mut resp = String::new();
reader.read_line(&mut resp).unwrap();
let resp: serde_json::Value = serde_json::from_str(resp.trim()).unwrap();
assert_eq!(resp["type"], "auth_fail");
}
#[test]
fn server_auth_ok_then_query() {
let users = vec![auth::UserEntry {
name: "bob".to_string(),
password_hash: auth::hash_password("secret"),
graphs: vec!["*".to_string()],
}];
let addr = start_auth_server(users);
let (stream, mut reader) = connect_with_auth(addr, "bob", "secret");
let mut writer = stream;
let resp = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":1,"query":"MATCH (n) RETURN n"}),
);
assert!(resp.get("error").is_none(), "unexpected error: {resp}");
assert!(resp.get("rows").is_some());
}
#[test]
fn server_graph_field_in_request() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let resp = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":1,"graph":"alpha","query":r#"INSERT (:T {x:1})"#}),
);
assert!(resp.get("error").is_none(), "{resp}");
let resp = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":2,"graph":"alpha","query":"MATCH (n:T) RETURN n.x"}),
);
assert_eq!(resp["rows"][0]["n.x"], 1);
let resp = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":3,"graph":"beta","query":"MATCH (n:T) RETURN n.x"}),
);
let rows = resp["rows"].as_array().unwrap();
assert!(rows.is_empty(), "expected no rows in beta, got {resp}");
}
#[test]
fn server_admin_list_graphs() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":1,"graph":"g1","query":"MATCH (n) RETURN n"}),
);
roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":2,"graph":"g2","query":"MATCH (n) RETURN n"}),
);
let mut line = serde_json::to_string(&serde_json::json!({"type":"admin","cmd":"graphs"}))
.unwrap();
line.push('\n');
writer.write_all(line.as_bytes()).unwrap();
let mut resp = String::new();
reader.read_line(&mut resp).unwrap();
let resp: serde_json::Value = serde_json::from_str(resp.trim()).unwrap();
assert_eq!(resp["type"], "admin_ok");
let graphs = resp["graphs"].as_array().unwrap();
let names: Vec<&str> = graphs.iter().map(|v| v.as_str().unwrap()).collect();
assert!(names.contains(&"g1"), "{resp}");
assert!(names.contains(&"g2"), "{resp}");
}
#[test]
fn server_admin_create_drop_graph() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let send_admin = |writer: &mut TcpStream, reader: &mut std::io::BufReader<TcpStream>, json: serde_json::Value| {
let mut line = serde_json::to_string(&json).unwrap();
line.push('\n');
writer.write_all(line.as_bytes()).unwrap();
let mut resp = String::new();
reader.read_line(&mut resp).unwrap();
serde_json::from_str::<serde_json::Value>(resp.trim()).unwrap()
};
let r = send_admin(
&mut writer,
&mut reader,
serde_json::json!({"type":"admin","cmd":"create","name":"newgraph"}),
);
assert_eq!(r["type"], "admin_ok", "{r}");
let r = send_admin(
&mut writer,
&mut reader,
serde_json::json!({"type":"admin","cmd":"graphs"}),
);
let graphs = r["graphs"].as_array().unwrap();
assert!(
graphs.iter().any(|g| g.as_str() == Some("newgraph")),
"missing newgraph in {r}"
);
let r = send_admin(
&mut writer,
&mut reader,
serde_json::json!({"type":"admin","cmd":"drop","name":"newgraph"}),
);
assert_eq!(r["type"], "admin_ok", "{r}");
}
#[test]
fn meta_graph_not_in_list() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":1,"graph":"_meta","query":"INSERT (:SavedView {name: 'v1', graph: 'default', query: 'MATCH (n) RETURN n', created: '2026-01-01'})"}),
);
let mut line =
serde_json::to_string(&serde_json::json!({"type":"admin","cmd":"graphs"})).unwrap();
line.push('\n');
writer.write_all(line.as_bytes()).unwrap();
let mut resp = String::new();
reader.read_line(&mut resp).unwrap();
let resp: serde_json::Value = serde_json::from_str(resp.trim()).unwrap();
let graphs = resp["graphs"].as_array().unwrap();
assert!(
!graphs.iter().any(|g| g.as_str().map(|s| s.starts_with('_')).unwrap_or(false)),
"system graph leaked into listing: {resp}"
);
}
#[test]
fn system_graph_drop_rejected() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let mut line =
serde_json::to_string(&serde_json::json!({"type":"admin","cmd":"drop","name":"_meta"}))
.unwrap();
line.push('\n');
writer.write_all(line.as_bytes()).unwrap();
let mut resp = String::new();
reader.read_line(&mut resp).unwrap();
let resp: serde_json::Value = serde_json::from_str(resp.trim()).unwrap();
assert_eq!(resp["type"], "admin_fail", "expected admin_fail: {resp}");
assert!(
resp["error"].as_str().unwrap_or("").contains("system graph"),
"error message should mention system graph: {resp}"
);
}
#[test]
fn meta_graph_queryable() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":1,"graph":"_meta","query":"INSERT (:SavedView {name: 'myview', graph: 'default', node_ids: 'AABBCC', created: '2026-01-01'})"}),
);
assert!(r.get("error").is_none(), "insert into _meta failed: {r}");
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":2,"graph":"_meta","query":"MATCH (v:SavedView) WHERE v.name = 'myview' RETURN v.name, v.graph"}),
);
assert!(r.get("error").is_none(), "query _meta failed: {r}");
let rows = r["rows"].as_array().unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["v.name"], "myview");
assert_eq!(rows[0]["v.graph"], "default");
}
#[test]
fn saved_view_save_list_delete_roundtrip() {
let addr = start_test_server();
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":1,"graph":"work","query":"INSERT (:Person {name: 'Alice'})"}),
);
assert!(r.get("error").is_none(), "insert failed: {r}");
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":2,"graph":"work","query":"MATCH (n:Person) RETURN n"}),
);
let ulid = r["rows"][0]["n"].as_str().expect("expected ULID string").to_string();
assert!(ulid.chars().all(|c| c.is_ascii_alphanumeric()), "unexpected chars in ULID: {ulid}");
let save_gql = format!(
"INSERT (:SavedView {{name: 'alice-view', graph: 'work', node_ids: '{}', created: '2026-03-20'}})",
ulid );
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":3,"graph":"_meta","query": save_gql}),
);
assert!(r.get("error").is_none(), "save view failed: {r}");
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":4,"graph":"_meta","query":"MATCH (v:SavedView) WHERE v.graph = 'work' RETURN v.name, v.node_ids, v.created ORDER BY v.created"}),
);
assert!(r.get("error").is_none(), "list views failed: {r}");
let rows = r["rows"].as_array().unwrap();
assert_eq!(rows.len(), 1, "expected 1 view, got {r}");
assert_eq!(rows[0]["v.name"], "alice-view");
assert!(rows[0]["v.node_ids"].as_str().unwrap().contains(ulid.as_str()),
"node_ids should contain the ULID");
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":5,"graph":"_meta","query":"MATCH (v:SavedView) WHERE v.name = 'alice-view' AND v.graph = 'work' DELETE v"}),
);
assert!(r.get("error").is_none(), "delete view failed: {r}");
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":6,"graph":"_meta","query":"MATCH (v:SavedView) WHERE v.graph = 'work' RETURN v.name"}),
);
assert!(r.get("error").is_none(), "{r}");
let rows = r["rows"].as_array().unwrap();
assert!(rows.is_empty(), "view should be deleted, got {r}");
}
#[test]
fn server_auto_rollback_on_disconnect() {
let addr = start_test_server();
{
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
roundtrip(&mut writer, &mut reader, serde_json::json!({"id":1,"query":"BEGIN"}));
roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":2,"query":r#"INSERT (:Transient {x:99})"#}),
);
}
std::thread::sleep(std::time::Duration::from_millis(100));
let (stream, mut reader) = connect_no_auth(addr);
let mut writer = stream;
let r = roundtrip(
&mut writer,
&mut reader,
serde_json::json!({"id":1,"query":"MATCH (n:Transient) RETURN n.x"}),
);
let rows = r["rows"].as_array().unwrap();
assert!(rows.is_empty(), "rolled-back data leaked: {r}");
}
fn start_test_server_with_gui() -> (std::net::SocketAddr, std::net::SocketAddr) {
let tcp_l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let tcp_addr = tcp_l.local_addr().unwrap();
drop(tcp_l);
let http_l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let http_addr = http_l.local_addr().unwrap();
drop(http_l);
let graphs_dir = tempfile::tempdir().unwrap().into_path();
let config = ServerConfig {
server: auth::ServerSection { auth_required: false },
users: vec![],
};
std::thread::spawn(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(serve(config, graphs_dir, tcp_addr, Some(http_addr)))
.unwrap();
});
std::thread::sleep(std::time::Duration::from_millis(200));
(tcp_addr, http_addr)
}
fn http_post(
http_addr: std::net::SocketAddr,
path: &str,
body: &serde_json::Value,
) -> serde_json::Value {
use std::io::{Read, Write};
let body_str = serde_json::to_string(body).unwrap();
let req = format!(
"POST {} HTTP/1.1\r\n\
Host: localhost\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n{}",
path,
body_str.len(),
body_str
);
let mut stream = TcpStream::connect(http_addr).unwrap();
stream.write_all(req.as_bytes()).unwrap();
let mut response = Vec::new();
stream.read_to_end(&mut response).unwrap();
let response = String::from_utf8_lossy(&response);
let body_start = response.find("\r\n\r\n").expect("no header/body separator") + 4;
let body_part = response[body_start..].trim();
serde_json::from_str(body_part).unwrap_or_else(|e| {
panic!("failed to parse HTTP response body as JSON: {e}\nbody: {body_part}")
})
}
#[test]
fn http_upload_nodes_basic() {
let (_tcp_addr, http_addr) = start_test_server_with_gui();
let csv = ":ID,name,age,:LABEL\n1,Alice,30,Person\n2,Bob,25,Person\n";
let resp = http_post(
http_addr,
"/api/upload/nodes",
&serde_json::json!({ "csv": csv }),
);
assert_eq!(resp["inserted"], 2, "resp: {resp}");
let id_map = resp["id_map"].as_object().unwrap();
assert_eq!(id_map.len(), 2);
assert!(id_map.contains_key("1"));
assert!(id_map.contains_key("2"));
}
#[test]
fn http_upload_nodes_with_label() {
let (_tcp_addr, http_addr) = start_test_server_with_gui();
let csv = ":ID,name\n1,Alice\n";
let resp = http_post(
http_addr,
"/api/upload/nodes",
&serde_json::json!({ "csv": csv, "label": "Employee" }),
);
assert_eq!(resp["inserted"], 1, "resp: {resp}");
}
#[test]
fn http_upload_edges_basic() {
let (_tcp_addr, http_addr) = start_test_server_with_gui();
let node_csv = ":ID,name,:LABEL\n1,Alice,Person\n2,Bob,Person\n";
let node_resp = http_post(
http_addr,
"/api/upload/nodes",
&serde_json::json!({ "csv": node_csv }),
);
assert_eq!(node_resp["inserted"], 2);
let id_map = &node_resp["id_map"];
let edge_csv = ":START_ID,:END_ID,:TYPE,weight\n1,2,KNOWS,0.9\n";
let edge_resp = http_post(
http_addr,
"/api/upload/edges",
&serde_json::json!({ "csv": edge_csv, "id_map": id_map }),
);
assert_eq!(edge_resp["inserted"], 1, "edge resp: {edge_resp}");
assert_eq!(edge_resp["skipped"], 0);
}
#[test]
fn http_upload_edges_skips_unresolved() {
let (_tcp_addr, http_addr) = start_test_server_with_gui();
let node_csv = ":ID,name,:LABEL\n1,Alice,Person\n";
let node_resp = http_post(
http_addr,
"/api/upload/nodes",
&serde_json::json!({ "csv": node_csv }),
);
let id_map = &node_resp["id_map"];
let edge_csv = ":START_ID,:END_ID,:TYPE\n1,99,KNOWS\n"; let edge_resp = http_post(
http_addr,
"/api/upload/edges",
&serde_json::json!({ "csv": edge_csv, "id_map": id_map }),
);
assert_eq!(edge_resp["inserted"], 0);
assert_eq!(edge_resp["skipped"], 1);
}
}