#![allow(dead_code, unused_imports)]
use std::path::{Path, PathBuf};
use std::sync::Arc;
use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json};
use sqry_daemon::{
DaemonConfig, EmptyGraphBuilder, IpcServer, RebuildDispatcher, SocketConfig, WorkspaceBuilder,
WorkspaceManager,
ipc::framing::{read_frame_json, write_frame_json},
ipc::protocol::{DaemonHello, DaemonHelloResponse, JsonRpcError, JsonRpcResponse},
ipc::shim_registry::ShimRegistry,
};
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio_util::sync::CancellationToken;
pub struct TestServer {
pub path: PathBuf,
pub shutdown: CancellationToken,
pub handle: tokio::task::JoinHandle<sqry_daemon::DaemonResult<()>>,
pub manager: Arc<WorkspaceManager>,
pub dispatcher: Arc<RebuildDispatcher>,
pub shim_registry: Arc<ShimRegistry>,
pub _tmp: TempDir,
}
impl TestServer {
pub async fn new() -> Self {
Self::with_builder(Arc::new(EmptyGraphBuilder) as Arc<dyn WorkspaceBuilder>).await
}
pub async fn with_builder(builder: Arc<dyn WorkspaceBuilder>) -> Self {
Self::with_builder_and_config(builder, DaemonConfig::default()).await
}
pub async fn with_config(config: DaemonConfig) -> Self {
Self::with_builder_and_config(
Arc::new(EmptyGraphBuilder) as Arc<dyn WorkspaceBuilder>,
config,
)
.await
}
pub async fn with_builder_and_config(
builder: Arc<dyn WorkspaceBuilder>,
config_in: DaemonConfig,
) -> Self {
let tmp = TempDir::new().expect("tempdir");
let sock_path = tmp.path().join("sqryd.sock");
let config = Arc::new(DaemonConfig {
socket: SocketConfig {
path: Some(sock_path.clone()),
pipe_name: None,
},
..config_in
});
let manager = WorkspaceManager::new_without_reaper(Arc::clone(&config));
let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
let dispatcher = RebuildDispatcher::new(
Arc::clone(&manager),
Arc::clone(&config),
Arc::clone(&plugins),
);
let dispatcher_clone = Arc::clone(&dispatcher);
let tool_executor = Arc::new(sqry_core::query::executor::QueryExecutor::new());
let shutdown = CancellationToken::new();
let server = IpcServer::bind(
Arc::clone(&config),
Arc::clone(&manager),
dispatcher,
builder,
tool_executor,
shutdown.clone(),
)
.await
.expect("bind");
let bound_path = server.socket_path().to_path_buf();
let shim_registry = server.shim_registry();
let handle = tokio::spawn(server.run());
wait_for_socket(&bound_path, std::time::Duration::from_secs(2)).await;
Self {
path: bound_path,
shutdown,
handle,
manager,
dispatcher: dispatcher_clone,
shim_registry,
_tmp: tmp,
}
}
pub fn shim_registry(&self) -> Arc<ShimRegistry> {
Arc::clone(&self.shim_registry)
}
pub async fn stop(self) {
self.shutdown.cancel();
let res = self.handle.await.expect("join");
res.expect("server run returns Ok");
}
}
async fn wait_for_socket(path: &Path, timeout: std::time::Duration) {
let deadline = std::time::Instant::now() + timeout;
while std::time::Instant::now() < deadline {
if path.exists() {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
panic!("socket at {} never appeared", path.display());
}
pub struct TestIpcClient {
stream: UnixStream,
next_id: i64,
}
impl TestIpcClient {
pub async fn connect(path: &Path) -> Self {
let stream = UnixStream::connect(path).await.expect("connect");
Self { stream, next_id: 1 }
}
pub async fn hello(&mut self, protocol_version: u32) -> DaemonHelloResponse {
let hello = DaemonHello {
client_version: "test/0.0.1".into(),
protocol_version,
};
write_frame_json(&mut self.stream, &hello)
.await
.expect("write hello");
read_frame_json::<_, DaemonHelloResponse>(&mut self.stream)
.await
.expect("read hello")
.expect("some frame")
}
pub async fn send_raw<T: Serialize>(&mut self, value: &T) {
write_frame_json(&mut self.stream, value)
.await
.expect("write raw");
}
pub async fn send_raw_bytes(&mut self, bytes: &[u8]) {
let len = u32::try_from(bytes.len()).unwrap();
self.stream.write_all(&len.to_le_bytes()).await.unwrap();
self.stream.write_all(bytes).await.unwrap();
self.stream.flush().await.unwrap();
}
pub async fn read_response(&mut self) -> JsonRpcResponse {
read_frame_json::<_, JsonRpcResponse>(&mut self.stream)
.await
.expect("read response")
.expect("some response")
}
pub async fn read_typed<T: DeserializeOwned>(&mut self) -> T {
read_frame_json::<_, T>(&mut self.stream)
.await
.expect("read typed")
.expect("some typed frame")
}
pub async fn request(&mut self, method: &str, params: Value) -> JsonRpcResponse {
let id = self.next_id;
self.next_id += 1;
let req = json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
});
self.send_raw(&req).await;
self.read_response().await
}
pub async fn notify(&mut self, method: &str, params: Value) {
let req = json!({
"jsonrpc": "2.0",
"method": method,
"params": params,
});
self.send_raw(&req).await;
}
}
pub fn expect_success(resp: &JsonRpcResponse) -> &Value {
match &resp.payload {
sqry_daemon::ipc::protocol::JsonRpcPayload::Success { result } => result,
sqry_daemon::ipc::protocol::JsonRpcPayload::Error { error } => {
panic!("expected success, got error: {error:?}")
}
}
}
pub fn expect_error(resp: &JsonRpcResponse) -> &JsonRpcError {
match &resp.payload {
sqry_daemon::ipc::protocol::JsonRpcPayload::Error { error } => error,
sqry_daemon::ipc::protocol::JsonRpcPayload::Success { result } => {
panic!("expected error, got success: {result:?}")
}
}
}