use serde_json::{Value, json};
use std::path::Path;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tracing::{error, info};
use crate::actions::memory;
use crate::config::Config;
use crate::errors::{MCSError, Result};
use crate::kg::GraphHandle;
use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
use crate::tools;
enum HandlerResult {
Value(Value),
RawResult(String),
}
const BUFFER_CAPACITY: usize = 65536;
const NEWLINE: &[u8] = b"\n";
pub const MAX_REQUEST_BYTES: usize = 16 * 1024 * 1024;
const MAX_TCP_CONNECTIONS: usize = 128;
enum LineRead {
Line,
Eof,
TooLong,
}
async fn read_line_capped<R>(reader: &mut R, out: &mut String, max: usize) -> std::io::Result<LineRead>
where
R: AsyncBufReadExt + Unpin,
{
out.clear();
let mut buf: Vec<u8> = Vec::new();
loop {
let available = reader.fill_buf().await?;
if available.is_empty() {
if buf.is_empty() {
return Ok(LineRead::Eof);
}
*out = String::from_utf8(buf.clone()).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "Non-UTF-8 input")
})?;
return Ok(LineRead::Line);
}
match available.iter().position(|&b| b == b'\n') {
Some(i) => {
if buf.len() + i + 1 > max {
reader.consume(i + 1);
return Ok(LineRead::TooLong);
}
buf.extend_from_slice(&available[..=i]);
reader.consume(i + 1);
*out = String::from_utf8(buf.clone()).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "Non-UTF-8 input")
})?;
return Ok(LineRead::Line);
}
None => {
let take = available.len();
if buf.len() + take > max {
reader.consume(take);
return Ok(LineRead::TooLong);
}
buf.extend_from_slice(available);
reader.consume(take);
}
}
}
}
fn parse_error(msg: String) -> JsonRpcResponse {
let mcp_error = MCSError::ParseError(msg);
JsonRpcResponse::error(None, mcp_error.error_code(), mcp_error.to_string())
}
pub fn process_value(value: Value, kg: &GraphHandle) -> Option<Value> {
let req: JsonRpcRequest = match serde_json::from_value(value) {
Ok(r) => r,
Err(e) => return Some(to_value(parse_error(e.to_string()))),
};
req.id.as_ref()?;
let response = match process_request(&req, kg) {
Ok(HandlerResult::Value(result)) => Some(to_value(JsonRpcResponse::success(req.id, result))),
Ok(HandlerResult::RawResult(_)) => {
unreachable!("RawResult must be handled at the dispatch level, not via process_value");
}
Err(e) => Some(to_value(JsonRpcResponse::error(req.id, e.error_code(), e.to_string()))),
};
response
}
pub fn dispatch_line(line: &str, kg: &GraphHandle) -> Option<String> {
let trimmed = line.trim();
if trimmed.is_empty() {
return Some(serde_json::to_string(&parse_error("Empty request".into())).unwrap());
}
let raw: Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(e) => return Some(serde_json::to_string(&parse_error(e.to_string())).unwrap()),
};
let req: JsonRpcRequest = match serde_json::from_value(raw) {
Ok(r) => r,
Err(e) => return Some(serde_json::to_string(&parse_error(e.to_string())).unwrap()),
};
req.id.as_ref()?;
match process_request(&req, kg) {
Ok(HandlerResult::Value(result)) => {
let resp = JsonRpcResponse::success(req.id, result);
Some(serde_json::to_string(&resp).unwrap())
}
Ok(HandlerResult::RawResult(result_json)) => {
let id_json = serde_json::to_string(&req.id).unwrap();
let mut out = String::with_capacity(64 + id_json.len() + result_json.len());
out.push_str(r#"{"jsonrpc":"2.0","id":"#);
out.push_str(&id_json);
out.push_str(r#","result":"#);
out.push_str(&result_json);
out.push('}');
Some(out)
}
Err(e) => {
let resp = JsonRpcResponse::error(req.id, e.error_code(), e.to_string());
Some(serde_json::to_string(&resp).unwrap())
}
}
}
pub fn dispatch_http_body(
body: &str,
kg: &GraphHandle,
) -> std::result::Result<Option<Value>, String> {
let value: Value = serde_json::from_str(body.trim()).map_err(|e| e.to_string())?;
match value {
Value::Array(items) => {
let responses: Vec<Value> =
items.into_iter().filter_map(|v| process_value_http(v, kg)).collect();
Ok((!responses.is_empty()).then_some(Value::Array(responses)))
}
other => Ok(process_value_http(other, kg)),
}
}
fn process_value_http(value: Value, kg: &GraphHandle) -> Option<Value> {
let req: JsonRpcRequest = match serde_json::from_value(value) {
Ok(r) => r,
Err(e) => return Some(to_value(parse_error(e.to_string()))),
};
req.id.as_ref()?;
match process_request(&req, kg) {
Ok(HandlerResult::Value(result)) => {
Some(to_value(JsonRpcResponse::success(req.id, result)))
}
Ok(HandlerResult::RawResult(result_json)) => {
let result_val: Value = serde_json::from_str(&result_json)
.unwrap_or(Value::Null);
Some(to_value(JsonRpcResponse::success(req.id, result_val)))
}
Err(e) => {
Some(to_value(JsonRpcResponse::error(req.id, e.error_code(), e.to_string())))
}
}
}
#[inline]
fn to_value(resp: JsonRpcResponse) -> Value {
serde_json::to_value(resp).expect("JsonRpcResponse always serializes")
}
pub struct MCPServer {
config: Arc<Config>,
kg: Arc<GraphHandle>,
}
impl MCPServer {
pub fn new(config: Config) -> Result<Self> {
let path = Path::new(&config.memory_file_path);
let kg = GraphHandle::new(path, config.durability).map_err(MCSError::IoError)?;
Ok(Self {
config: Arc::new(config),
kg: Arc::new(kg),
})
}
pub fn graph(&self) -> Arc<GraphHandle> {
Arc::clone(&self.kg)
}
pub async fn run_stdio(&self) -> Result<()> {
let stdin = tokio::io::stdin();
let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, stdin);
let mut stdout = tokio::io::stdout();
serve_line_conn(&mut reader, &mut stdout, Arc::clone(&self.kg)).await
}
pub async fn run_tcp(&self, addr: &str) -> Result<()> {
let listener = TcpListener::bind(addr).await.map_err(MCSError::IoError)?;
let semaphore = Arc::new(Semaphore::new(MAX_TCP_CONNECTIONS));
let auth_token = self.config.auth_token.clone();
info!(
"Listening for TCP MCP connections on {addr} (max {MAX_TCP_CONNECTIONS}, auth {})",
if auth_token.is_some() { "on" } else { "off" }
);
loop {
let permit = Arc::clone(&semaphore).acquire_owned().await;
let (socket, peer) = listener.accept().await.map_err(MCSError::IoError)?;
let kg = Arc::clone(&self.kg);
let auth_token = auth_token.clone();
tokio::spawn(async move {
let _permit = permit; let (read_half, mut write_half) = socket.into_split();
let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, read_half);
if let Some(ref expected) = auth_token {
match authenticate_line_conn(&mut reader, expected).await {
Ok(true) => {}
Ok(false) => {
let _ = write_half
.write_all(AUTH_REQUIRED_LINE.as_bytes())
.await;
let _ = write_half.flush().await;
return;
}
Err(e) => {
error!("TCP auth error for {peer}: {e}");
return;
}
}
}
if let Err(e) = serve_line_conn(&mut reader, &mut write_half, kg).await {
error!("TCP connection {peer} error: {e}");
}
});
}
}
pub async fn run_http(&self, addr: &str) -> Result<()> {
crate::http::run(addr, self.graph(), self.config.auth_token.clone()).await
}
}
const AUTH_REQUIRED_LINE: &str = "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32001,\
\"message\":\"Authentication required: send the bearer token as the first line\"},\"id\":null}\n";
async fn authenticate_line_conn<R>(reader: &mut R, expected: &str) -> Result<bool>
where
R: AsyncBufReadExt + Unpin,
{
let mut line = String::new();
match read_line_capped(reader, &mut line, MAX_REQUEST_BYTES)
.await
.map_err(MCSError::IoError)?
{
LineRead::Line => Ok(token_matches(&line, expected)),
_ => Ok(false),
}
}
async fn serve_line_conn<R, W>(reader: &mut R, writer: &mut W, kg: Arc<GraphHandle>) -> Result<()>
where
R: AsyncBufReadExt + Unpin,
W: AsyncWriteExt + Unpin,
{
let mut line = String::with_capacity(1024);
let mut out = Vec::with_capacity(BUFFER_CAPACITY);
loop {
match read_line_capped(reader, &mut line, MAX_REQUEST_BYTES).await {
Ok(LineRead::Eof) => break,
Ok(LineRead::Line) => {
let line_copy = line.clone();
let kg_clone = Arc::clone(&kg);
let resp = tokio::task::spawn_blocking(move || dispatch_line(&line_copy, &kg_clone))
.await
.map_err(|join_err| {
error!("dispatch task panicked: {join_err}");
MCSError::IoError(std::io::Error::other("dispatch task panicked"))
})?;
if let Some(resp) = resp {
out.clear();
out.extend_from_slice(resp.as_bytes());
out.extend_from_slice(NEWLINE);
writer.write_all(&out).await.map_err(MCSError::IoError)?;
writer.flush().await.map_err(MCSError::IoError)?;
}
}
Ok(LineRead::TooLong) => {
let err = MCSError::InvalidParams("Request exceeds maximum size of 16MB".into());
let response = JsonRpcResponse::error(None, err.error_code(), err.to_string());
out.clear();
serde_json::to_writer(&mut out, &response).map_err(MCSError::JsonError)?;
out.extend_from_slice(NEWLINE);
writer.write_all(&out).await.map_err(MCSError::IoError)?;
writer.flush().await.map_err(MCSError::IoError)?;
break;
}
Err(e) => {
error!("IO error: {}", e);
break;
}
}
}
Ok(())
}
fn process_request(req: &JsonRpcRequest, kg: &GraphHandle) -> Result<HandlerResult> {
match req.method.as_str() {
"initialize" => Ok(HandlerResult::Value(handle_initialize(req))),
"tools/list" => Ok(HandlerResult::Value(handle_tools_list())),
"tools/call" => handle_tools_call(req, kg),
"ping" => Ok(HandlerResult::Value(Value::Null)),
method if method.starts_with("notifications/") => {
tracing::trace!("Received notification: {method}");
Ok(HandlerResult::Value(Value::Null))
}
_ => Err(MCSError::MethodNotFound(req.method.clone())),
}
}
const SUPPORTED_PROTOCOL_VERSIONS: &[&str] =
&["2025-11-25", "2025-06-18", "2025-03-26", "2024-11-05"];
const LATEST_PROTOCOL_VERSION: &str = "2025-11-25";
const SERVER_INSTRUCTIONS: &str = "Knowledge-graph memory MCP server. Entity names are unique and \
case-sensitive. Use `create_entities`/`create_relations` to build the graph, `add_observations` to \
attach facts, and `search_nodes`/`open_nodes`/`read_graph` to retrieve. Prefer `upsert_entities` for \
idempotent writes and `merge_entities` to collapse duplicates. Tool failures are returned with \
`isError: true` rather than as protocol errors — read the message and retry.";
fn handle_initialize(req: &JsonRpcRequest) -> Value {
let protocol_version = req
.params
.as_ref()
.and_then(|p| p.get("protocolVersion"))
.and_then(Value::as_str)
.filter(|v| SUPPORTED_PROTOCOL_VERSIONS.contains(v))
.unwrap_or(LATEST_PROTOCOL_VERSION);
json!({
"protocolVersion": protocol_version,
"capabilities": {
"tools": { "listChanged": false }
},
"serverInfo": {
"name": "mcp-memory",
"version": env!("CARGO_PKG_VERSION")
},
"instructions": SERVER_INSTRUCTIONS
})
}
#[inline]
fn tool_error(message: &str) -> Value {
json!({
"content": [{ "type": "text", "text": message }],
"isError": true
})
}
pub fn token_matches(presented: &str, expected: &str) -> bool {
use subtle::ConstantTimeEq;
let presented = presented.trim();
let presented = presented.strip_prefix("Bearer ").unwrap_or(presented).trim();
presented.as_bytes().ct_eq(expected.as_bytes()).into()
}
fn handle_tools_list() -> Value {
static CACHED: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
if let Some(cached) = CACHED.get() {
return cached.clone();
}
let tools_json = include_str!("../tools.json");
let tools: Vec<Value> =
serde_json::from_str(tools_json).map_err(MCSError::JsonError).unwrap();
let result = json!({ "tools": tools });
let _ = CACHED.set(result.clone());
result
}
fn handle_tools_call(req: &JsonRpcRequest, kg: &GraphHandle) -> Result<HandlerResult> {
let tool_name = req
.params
.as_ref()
.and_then(|p| p.get("name").and_then(|v| v.as_str()))
.ok_or_else(|| MCSError::InvalidParams("Missing 'name' parameter".into()))?;
let tool_args = req.params.as_ref().and_then(|p| p.get("arguments"));
if !tools::tool_exists(tool_name) {
return Err(MCSError::MethodNotFound(tool_name.to_string()));
}
let result = match tool_name {
"read_graph" => memory::handle_read_graph(kg, tool_args).map(HandlerResult::RawResult),
"search_nodes" => memory::handle_search_nodes(kg, tool_args).map(HandlerResult::RawResult),
"create_entities" => memory::handle_create_entities(kg, tool_args).map(HandlerResult::Value),
"create_relations" => memory::handle_create_relations(kg, tool_args).map(HandlerResult::Value),
"add_observations" => memory::handle_add_observations(kg, tool_args).map(HandlerResult::Value),
"delete_entities" => memory::handle_delete_entities(kg, tool_args).map(HandlerResult::Value),
"delete_observations" => memory::handle_delete_observations(kg, tool_args).map(HandlerResult::Value),
"delete_relations" => memory::handle_delete_relations(kg, tool_args).map(HandlerResult::Value),
"open_nodes" => memory::handle_open_nodes(kg, tool_args).map(HandlerResult::Value),
"get_entity" => memory::handle_get_entity(kg, tool_args).map(HandlerResult::Value),
"graph_stats" => memory::handle_graph_stats(kg).map(HandlerResult::Value),
"search_relations" => memory::handle_search_relations(kg, tool_args).map(HandlerResult::Value),
"find_path" => memory::handle_find_path(kg, tool_args).map(HandlerResult::Value),
"compact" => memory::handle_compact(kg).map(HandlerResult::Value),
"get_neighbors" => memory::handle_get_neighbors(kg, tool_args).map(HandlerResult::Value),
"describe_entity" => memory::handle_describe_entity(kg, tool_args).map(HandlerResult::Value),
"list_entity_types" => memory::handle_list_entity_types(kg).map(HandlerResult::Value),
"list_relation_types" => memory::handle_list_relation_types(kg).map(HandlerResult::Value),
"upsert_entities" => memory::handle_upsert_entities(kg, tool_args).map(HandlerResult::Value),
"export_graph" => memory::handle_export_graph(kg, tool_args).map(HandlerResult::Value),
"merge_entities" => memory::handle_merge_entities(kg, tool_args).map(HandlerResult::Value),
"extract_subgraph" => memory::handle_extract_subgraph(kg, tool_args).map(HandlerResult::Value),
"batch_get_entities" => memory::handle_batch_get_entities(kg, tool_args).map(HandlerResult::Value),
"find_all_paths" => memory::handle_find_all_paths(kg, tool_args).map(HandlerResult::Value),
tool => Err(MCSError::MethodNotFound(tool.to_string())),
};
Ok(result.unwrap_or_else(|e| {
error!("Tool '{tool_name}' error: {e}");
HandlerResult::Value(tool_error(&e.to_string()))
}))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Durability;
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
fn setup_kg() -> (Arc<GraphHandle>, String) {
let pid = std::process::id();
let seq = COUNTER.fetch_add(1, Ordering::SeqCst);
let path = format!("/tmp/mcp_mem_test_{pid}_{seq}.bin");
let kg = GraphHandle::new(Path::new(&path), Durability::Async).unwrap();
(Arc::new(kg), path)
}
fn cleanup(path: &str) {
let _ = std::fs::remove_file(path);
}
#[test]
fn test_dispatch_line_valid_request() {
let (kg, path) = setup_kg();
let line = r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#;
let resp = dispatch_line(line, &kg).unwrap();
let v: Value = serde_json::from_str(&resp).unwrap();
assert_eq!(v["id"], 1);
assert_eq!(v["result"]["serverInfo"]["name"], "mcp-memory");
cleanup(&path);
}
#[test]
fn test_dispatch_line_invalid_json() {
let (kg, path) = setup_kg();
let resp = dispatch_line("{invalid}", &kg).unwrap();
let v: Value = serde_json::from_str(&resp).unwrap();
assert_eq!(v["error"]["code"], -32700);
assert!(v["id"].is_null());
cleanup(&path);
}
#[test]
fn test_dispatch_line_empty() {
let (kg, path) = setup_kg();
let resp = dispatch_line(" \n", &kg).unwrap();
let v: Value = serde_json::from_str(&resp).unwrap();
assert_eq!(v["error"]["code"], -32700);
cleanup(&path);
}
#[test]
fn test_notification_has_no_response() {
let (kg, path) = setup_kg();
let line = r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#;
assert!(dispatch_line(line, &kg).is_none());
cleanup(&path);
}
#[test]
fn test_unknown_method_error() {
let (kg, path) = setup_kg();
let line = r#"{"jsonrpc":"2.0","method":"does/not/exist","id":7}"#;
let v: Value = serde_json::from_str(&dispatch_line(line, &kg).unwrap()).unwrap();
assert_eq!(v["id"], 7);
assert_eq!(v["error"]["code"], -32601);
cleanup(&path);
}
#[test]
fn test_tools_call_roundtrip_via_dispatch() {
let (kg, path) = setup_kg();
let create = r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"create_entities","arguments":{"entities":[{"name":"Ada","entityType":"person","observations":["math"]}]}}}"#;
assert!(dispatch_line(create, &kg).is_some());
let read = r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_graph","arguments":{}}}"#;
let v: Value = serde_json::from_str(&dispatch_line(read, &kg).unwrap()).unwrap();
let text = v["result"]["content"][0]["text"].as_str().unwrap();
assert!(text.contains("Ada"));
cleanup(&path);
}
#[test]
fn test_http_body_batch_and_notifications() {
let (kg, path) = setup_kg();
let batch = r#"[
{"jsonrpc":"2.0","method":"initialize","id":1},
{"jsonrpc":"2.0","method":"notifications/initialized"}
]"#;
let out = dispatch_http_body(batch, &kg).unwrap().unwrap();
let arr = out.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["id"], 1);
let notif = r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#;
assert!(dispatch_http_body(notif, &kg).unwrap().is_none());
assert!(dispatch_http_body("{bad", &kg).is_err());
cleanup(&path);
}
#[test]
fn test_handle_initialize_response() {
let (kg, path) = setup_kg();
let req = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
method: "initialize".to_string(),
params: None,
id: Some(Value::Number(1.into())),
};
let result = match process_request(&req, &kg).unwrap() {
HandlerResult::Value(v) => v,
HandlerResult::RawResult(_) => panic!("expected Value"),
};
assert_eq!(result["protocolVersion"], LATEST_PROTOCOL_VERSION);
assert_eq!(result["serverInfo"]["name"], "mcp-memory");
assert!(result["instructions"].is_string());
cleanup(&path);
}
#[test]
fn test_initialize_version_negotiation() {
let (kg, path) = setup_kg();
let req = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
method: "initialize".to_string(),
params: Some(json!({ "protocolVersion": "2024-11-05" })),
id: Some(Value::Number(1.into())),
};
let result = match process_request(&req, &kg).unwrap() {
HandlerResult::Value(v) => v,
HandlerResult::RawResult(_) => panic!("expected Value"),
};
assert_eq!(result["protocolVersion"], "2024-11-05");
cleanup(&path);
}
#[test]
fn test_tool_error_on_bad_args() {
let (kg, path) = setup_kg();
let line = r#"{"jsonrpc":"2.0","id":9,"method":"tools/call","params":{"name":"get_entity","arguments":{}}}"#;
let v: Value = serde_json::from_str(&dispatch_line(line, &kg).unwrap()).unwrap();
assert!(v["error"].is_null(), "should not be a protocol error: {v}");
assert_eq!(v["result"]["isError"], json!(true));
cleanup(&path);
}
#[test]
fn test_token_matches() {
assert!(token_matches("secret", "secret"));
assert!(token_matches("Bearer secret", "secret"));
assert!(token_matches(" Bearer secret ", "secret"));
assert!(!token_matches("wrong", "secret"));
assert!(!token_matches("", "secret"));
assert!(!token_matches("secre", "secret"));
assert!(!token_matches("secretx", "secret"));
assert!(!token_matches("Bearer ", "secret"));
}
fn init_result(params: Option<Value>, kg: &GraphHandle) -> Value {
let req = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
method: "initialize".to_string(),
params,
id: Some(json!(1)),
};
match process_request(&req, kg).unwrap() {
HandlerResult::Value(v) => v,
HandlerResult::RawResult(_) => panic!("expected Value"),
}
}
#[test]
fn test_compliance_negotiation_matrix() {
let (kg, path) = setup_kg();
for v in SUPPORTED_PROTOCOL_VERSIONS {
let r = init_result(Some(json!({ "protocolVersion": v })), &kg);
assert_eq!(&r["protocolVersion"], v);
}
assert_eq!(
init_result(Some(json!({ "protocolVersion": "1900-01-01" })), &kg)["protocolVersion"],
LATEST_PROTOCOL_VERSION
);
assert_eq!(init_result(None, &kg)["protocolVersion"], "2025-11-25");
cleanup(&path);
}
#[test]
fn test_compliance_initialize_honest_with_instructions() {
let (kg, path) = setup_kg();
let r = init_result(None, &kg);
assert!(r["capabilities"]["tools"].is_object());
for cap in ["resources", "prompts", "logging", "completions"] {
assert!(r["capabilities"][cap].is_null(), "must not advertise {cap}");
}
assert!(r["instructions"].as_str().is_some_and(|s| !s.is_empty()));
cleanup(&path);
}
#[test]
fn test_compliance_tool_success_is_content_wrapped() {
let (kg, path) = setup_kg();
let create = r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"create_entities","arguments":{"entities":[{"name":"Ada","entityType":"person","observations":["math"]}]}}}"#;
let v: Value = serde_json::from_str(&dispatch_line(create, &kg).unwrap()).unwrap();
let content = v["result"]["content"].as_array().expect("content array");
assert!(!content.is_empty());
assert_eq!(content[0]["type"], "text");
assert!(v["error"].is_null());
cleanup(&path);
}
#[test]
fn test_compliance_protocol_errors_remain_protocol_errors() {
let (kg, path) = setup_kg();
let line = r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"no_such_tool","arguments":{}}}"#;
let v: Value = serde_json::from_str(&dispatch_line(line, &kg).unwrap()).unwrap();
assert_eq!(v["error"]["code"], -32601);
assert!(v["result"].is_null());
cleanup(&path);
}
#[tokio::test]
async fn test_compliance_tcp_auth_handshake() {
let mut ok = tokio::io::BufReader::new(&b"Bearer s3cr3t\n"[..]);
assert!(authenticate_line_conn(&mut ok, "s3cr3t").await.unwrap());
let mut bad = tokio::io::BufReader::new(&b"nope\n"[..]);
assert!(!authenticate_line_conn(&mut bad, "s3cr3t").await.unwrap());
let mut empty = tokio::io::BufReader::new(&b""[..]);
assert!(!authenticate_line_conn(&mut empty, "s3cr3t").await.unwrap());
}
}