use std::io::Write;
use std::net::SocketAddr;
use futures_util::SinkExt;
use rmpv::Value;
use rpc_runtime_client::RpcClient;
use rpc_runtime_core::{InstanceId, MethodId};
use rpc_runtime_errors::RuntimeErrorCode;
use rpc_runtime_transport_ipc::{FrameConfig, IpcEndpoint, IpcListener};
use rpc_runtime_transport_websocket::{WebSocketConfig, WebSocketConnection, WebSocketListener};
use tempfile::tempdir;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tripley_native_core::{
ARCHIVE_INSTANCE, ConfiguredNativePolicy, EVENT_NOTIFICATION_ID, FS_INSTANCE, NativePolicy,
NativePolicyConfig, NativeRpcServerOptions, NativeServiceSet, RUNTIME_INSTANCE,
SQLITE_INSTANCE, SYSTEM_INSTANCE, TCP_INSTANCE, WEBSOCKET_INSTANCE, build_native_rpc_server,
build_native_rpc_server_with_config, build_native_rpc_server_with_options,
build_native_rpc_server_with_policy,
};
#[test]
fn generated_contract_matches_runtime_constants() {
use tripley_native_core::generated::services;
assert_eq!(
services::RUNTIME_SERVICE_SERVICE_GUID_STR,
tripley_native_core::RUNTIME_SERVICE_GUID
);
assert_eq!(
services::FILE_SYSTEM_SERVICE_SERVICE_GUID_STR,
tripley_native_core::FS_SERVICE_GUID
);
assert_eq!(
services::ARCHIVE_SERVICE_SERVICE_GUID_STR,
tripley_native_core::ARCHIVE_SERVICE_GUID
);
assert_eq!(
services::TCP_SERVICE_SERVICE_GUID_STR,
tripley_native_core::TCP_SERVICE_GUID
);
assert_eq!(
services::WEB_SOCKET_SERVICE_SERVICE_GUID_STR,
tripley_native_core::WEBSOCKET_SERVICE_GUID
);
assert_eq!(
services::SQLITE_SERVICE_SERVICE_GUID_STR,
tripley_native_core::SQLITE_SERVICE_GUID
);
assert_eq!(
services::SYSTEM_SERVICE_SERVICE_GUID_STR,
tripley_native_core::SYSTEM_SERVICE_GUID
);
assert_eq!(
services::TCP_SERVICE_EVENT_NOTIFICATION_ID,
EVENT_NOTIFICATION_ID
);
assert_eq!(
services::WEB_SOCKET_SERVICE_EVENT_NOTIFICATION_ID,
EVENT_NOTIFICATION_ID
);
}
#[tokio::test]
async fn fs_and_sqlite_services_work_over_xrpc() {
let (client, server_task) = connect().await;
let ids = client
.resolve_instance_ids(vec![
RUNTIME_INSTANCE.to_string(),
FS_INSTANCE.to_string(),
SQLITE_INSTANCE.to_string(),
])
.await
.expect("resolve native instances");
let runtime = InstanceId::new(ids[0]).expect("runtime id");
let fs = InstanceId::new(ids[1]).expect("fs id");
let sqlite = InstanceId::new(ids[2]).expect("sqlite id");
let dir = tempdir().expect("temp dir");
let file = dir.path().join("hello.txt");
client
.call(
fs,
MethodId::new(2),
Value::Array(vec![
Value::from(file.to_string_lossy().as_ref()),
Value::Binary(b"hello".to_vec()),
]),
)
.await
.expect("write file");
let payload = client
.call(
fs,
MethodId::new(1),
Value::Array(vec![Value::from(file.to_string_lossy().as_ref())]),
)
.await
.expect("read file");
assert_eq!(
payload,
Value::Array(vec![Value::Binary(b"hello".to_vec())])
);
let db_path = dir.path().join("test.db");
let db_id = client
.call(
sqlite,
MethodId::new(1),
Value::Array(vec![Value::from(db_path.to_string_lossy().as_ref())]),
)
.await
.expect("open sqlite");
let db_id = match db_id {
Value::Array(fields) => match &fields[0] {
Value::String(value) => value.as_str().expect("db id").to_string(),
other => panic!("unexpected db id field: {other:?}"),
},
other => panic!("unexpected db id payload: {other:?}"),
};
client
.call(
sqlite,
MethodId::new(3),
Value::Array(vec![
Value::from(db_id.as_str()),
Value::from("CREATE TABLE items(id INTEGER PRIMARY KEY, name TEXT);"),
]),
)
.await
.expect("create table");
let inserted = client
.call(
sqlite,
MethodId::new(4),
Value::Array(vec![
Value::from(db_id.as_str()),
Value::from("INSERT INTO items(name) VALUES (?)"),
Value::Array(vec![sqlite_text("alpha")]),
]),
)
.await
.expect("insert");
assert!(matches!(inserted, Value::Array(_)));
let row = client
.call(
sqlite,
MethodId::new(5),
Value::Array(vec![
Value::from(db_id.as_str()),
Value::from("SELECT name FROM items WHERE id = ?"),
Value::Array(vec![sqlite_integer(1)]),
]),
)
.await
.expect("query one");
assert!(matches!(row, Value::Array(_)));
client
.call(
sqlite,
MethodId::new(2),
Value::Array(vec![Value::from(db_id.as_str())]),
)
.await
.expect("explicit sqlite close");
let db_id = client
.call(
sqlite,
MethodId::new(1),
Value::Array(vec![Value::from(db_path.to_string_lossy().as_ref())]),
)
.await
.expect("reopen sqlite");
let db_id = match db_id {
Value::Array(fields) => match &fields[0] {
Value::String(value) => value.as_str().expect("db id").to_string(),
other => panic!("unexpected db id field: {other:?}"),
},
other => panic!("unexpected db id payload: {other:?}"),
};
client
.call(runtime, MethodId::new(3), Value::Array(vec![]))
.await
.expect("dispose resources");
let disposed = client
.call(
sqlite,
MethodId::new(3),
Value::Array(vec![Value::from(db_id.as_str()), Value::from("SELECT 1")]),
)
.await;
assert!(
disposed.is_err(),
"disposed sqlite db should not be callable"
);
client.goodbye("done").await.expect("goodbye");
server_task.abort();
}
#[tokio::test]
async fn server_options_register_selected_services_and_capabilities() {
let services = NativeServiceSet {
fs: true,
..NativeServiceSet::runtime_only()
};
let server = build_native_rpc_server_with_options(NativeRpcServerOptions {
services,
..NativeRpcServerOptions::default()
});
let (client, server_task) = connect_with_server(server).await;
let ids = client
.resolve_instance_ids(vec![
RUNTIME_INSTANCE.to_string(),
FS_INSTANCE.to_string(),
SQLITE_INSTANCE.to_string(),
])
.await
.expect("resolve native instances");
assert!(InstanceId::new(ids[0]).is_some(), "runtime should resolve");
assert!(InstanceId::new(ids[1]).is_some(), "fs should resolve");
assert_eq!(ids[2], 0, "sqlite should not resolve");
let runtime = InstanceId::new(ids[0]).expect("runtime id");
let capabilities = client
.call(runtime, MethodId::new(2), Value::Array(vec![]))
.await
.expect("list capabilities");
let capabilities = string_values(capabilities);
assert_eq!(capabilities, vec!["runtime.info", "fs"]);
server_task.abort();
}
#[tokio::test]
async fn native_server_accepts_websocket_transport() {
let server = build_native_rpc_server_with_options(NativeRpcServerOptions {
services: NativeServiceSet::runtime_only(),
..NativeRpcServerOptions::default()
});
let listener = WebSocketListener::bind(
"127.0.0.1:0".parse::<SocketAddr>().expect("addr"),
WebSocketConfig::default(),
)
.await
.expect("bind websocket listener");
let addr = listener.local_addr().expect("local addr");
let server_task = server.spawn_listener(listener);
let connection =
WebSocketConnection::connect(format!("ws://{addr}"), WebSocketConfig::default())
.await
.expect("connect websocket client");
let client = RpcClient::from_connection(connection)
.await
.expect("handshake websocket client");
let ids = client
.resolve_instance_ids(vec![RUNTIME_INSTANCE.to_string()])
.await
.expect("resolve runtime");
let runtime = InstanceId::new(ids[0]).expect("runtime id");
let capabilities = client
.call(runtime, MethodId::new(2), Value::Array(vec![]))
.await
.expect("list capabilities");
assert_eq!(string_values(capabilities), vec!["runtime.info"]);
server_task.abort();
}
#[tokio::test]
async fn filesystem_open_file_handles_support_streaming_writes() {
let (client, server_task) = connect().await;
let ids = client
.resolve_instance_ids(vec![FS_INSTANCE.to_string()])
.await
.expect("resolve fs instance");
let fs = InstanceId::new(ids[0]).expect("fs id");
let dir = tempdir().expect("temp dir");
let file = dir.path().join("stream.log");
let handle = client
.call(
fs,
MethodId::new(11),
Value::Array(vec![
Value::from(file.to_string_lossy().as_ref()),
Value::Array(vec![
Value::Boolean(true),
Value::Boolean(true),
Value::Boolean(false),
Value::Boolean(true),
Value::Boolean(false),
Value::Boolean(false),
]),
]),
)
.await
.expect("open file");
let file_id = string_field(&handle, 0);
client
.call(
fs,
MethodId::new(13),
Value::Array(vec![
Value::from(file_id.as_str()),
Value::Binary(b"line 1\nline 2".to_vec()),
]),
)
.await
.expect("write file handle");
client
.call(
fs,
MethodId::new(14),
Value::Array(vec![Value::from(file_id.as_str())]),
)
.await
.expect("flush file handle");
let seek = client
.call(
fs,
MethodId::new(15),
Value::Array(vec![
Value::from(file_id.as_str()),
Value::from(0),
Value::from("start"),
]),
)
.await
.expect("seek file handle");
assert_eq!(u64_field(&seek, 0), 0);
let read = client
.call(
fs,
MethodId::new(12),
Value::Array(vec![Value::from(file_id.as_str()), Value::from(32)]),
)
.await
.expect("read file handle");
assert_eq!(
read,
Value::Array(vec![Value::Binary(b"line 1\nline 2".to_vec())])
);
client
.call(
fs,
MethodId::new(17),
Value::Array(vec![Value::from(file_id.as_str())]),
)
.await
.expect("close file handle");
client
.call(
fs,
MethodId::new(13),
Value::Array(vec![
Value::from(file_id.as_str()),
Value::Binary(b"!".to_vec()),
]),
)
.await
.expect_err("closed file handle should fail");
server_task.abort();
}
#[tokio::test]
async fn policy_can_deny_filesystem_access() {
let server = build_native_rpc_server_with_policy(std::sync::Arc::new(DenyFsPolicy));
let listener = IpcListener::bind(
IpcEndpoint::tcp("127.0.0.1:0".parse::<SocketAddr>().expect("addr")),
FrameConfig::default(),
)
.await
.expect("bind native test server");
let addr = listener.local_addr().expect("local addr");
let task = server.spawn_listener(listener);
let client = RpcClient::connect(IpcEndpoint::tcp(addr), FrameConfig::default())
.await
.expect("connect native client");
let ids = client
.resolve_instance_ids(vec![FS_INSTANCE.to_string()])
.await
.expect("resolve fs instance");
let fs = InstanceId::new(ids[0]).expect("fs id");
let result = client
.call(
fs,
MethodId::new(7),
Value::Array(vec![Value::from("/tmp")]),
)
.await;
assert!(result.is_err(), "filesystem policy denial should fail call");
task.abort();
}
#[tokio::test]
async fn configured_policy_limits_filesystem_paths() {
let allowed = tempdir().expect("allowed temp dir");
let denied = tempdir().expect("denied temp dir");
let server = build_native_rpc_server_with_config(
NativePolicyConfig::default().allow_fs_read_write(allowed.path()),
);
let (client, task) = connect_with_server(server).await;
let ids = client
.resolve_instance_ids(vec![FS_INSTANCE.to_string()])
.await
.expect("resolve fs instance");
let fs = InstanceId::new(ids[0]).expect("fs id");
let allowed_file = allowed.path().join("ok.txt");
client
.call(
fs,
MethodId::new(2),
Value::Array(vec![
Value::from(allowed_file.to_string_lossy().as_ref()),
Value::Binary(b"ok".to_vec()),
]),
)
.await
.expect("write allowed file");
let denied_file = denied.path().join("no.txt");
let denied = client
.call(
fs,
MethodId::new(2),
Value::Array(vec![
Value::from(denied_file.to_string_lossy().as_ref()),
Value::Binary(b"no".to_vec()),
]),
)
.await
.expect_err("write outside allowlist should fail");
assert_eq!(denied.code, RuntimeErrorCode::AccessDenied);
assert!(denied.message.contains("write_file"));
task.abort();
}
#[tokio::test]
async fn archive_service_zips_and_unzips_paths() {
let (client, task) = connect().await;
let ids = client
.resolve_instance_ids(vec![ARCHIVE_INSTANCE.to_string()])
.await
.expect("resolve archive instance");
let archive = InstanceId::new(ids[0]).expect("archive id");
let dir = tempdir().expect("temp dir");
let source = dir.path().join("source");
std::fs::create_dir_all(source.join("nested")).expect("create source");
std::fs::write(source.join("root.txt"), b"root").expect("write root");
std::fs::write(source.join("nested/item.txt"), b"item").expect("write nested");
let self_contained = client
.call(
archive,
MethodId::new(1),
Value::Array(vec![
Value::from(source.to_string_lossy().as_ref()),
Value::from(source.join("bad.zip").to_string_lossy().as_ref()),
Value::Boolean(false),
Value::Boolean(true),
]),
)
.await
.expect_err("zip output inside source should fail");
assert!(self_contained.message.contains("source directory"));
let zip_path = dir.path().join("source.zip");
client
.call(
archive,
MethodId::new(1),
Value::Array(vec![
Value::from(source.to_string_lossy().as_ref()),
Value::from(zip_path.to_string_lossy().as_ref()),
Value::Boolean(false),
Value::Boolean(true),
]),
)
.await
.expect("zip directory");
let destination = dir.path().join("out");
client
.call(
archive,
MethodId::new(2),
Value::Array(vec![
Value::from(zip_path.to_string_lossy().as_ref()),
Value::from(destination.to_string_lossy().as_ref()),
Value::Boolean(false),
]),
)
.await
.expect("unzip directory");
assert_eq!(
std::fs::read(destination.join("source/root.txt")).expect("read root"),
b"root"
);
assert_eq!(
std::fs::read(destination.join("source/nested/item.txt")).expect("read nested"),
b"item"
);
let existing = client
.call(
archive,
MethodId::new(2),
Value::Array(vec![
Value::from(zip_path.to_string_lossy().as_ref()),
Value::from(destination.to_string_lossy().as_ref()),
Value::Boolean(false),
]),
)
.await
.expect_err("unzip without overwrite should fail on existing files");
assert_eq!(existing.code, RuntimeErrorCode::InternalRuntimeError);
assert!(existing.message.contains("already exists"));
client
.call(
archive,
MethodId::new(2),
Value::Array(vec![
Value::from(zip_path.to_string_lossy().as_ref()),
Value::from(destination.to_string_lossy().as_ref()),
Value::Boolean(true),
]),
)
.await
.expect("unzip with overwrite");
task.abort();
}
#[tokio::test]
async fn archive_service_rejects_zip_slip_entries() {
let (client, task) = connect().await;
let ids = client
.resolve_instance_ids(vec![ARCHIVE_INSTANCE.to_string()])
.await
.expect("resolve archive instance");
let archive = InstanceId::new(ids[0]).expect("archive id");
let dir = tempdir().expect("temp dir");
let zip_path = dir.path().join("bad.zip");
{
let file = std::fs::File::create(&zip_path).expect("create malicious zip");
let mut zip = zip::ZipWriter::new(file);
zip.start_file("../evil.txt", zip::write::SimpleFileOptions::default())
.expect("start malicious entry");
zip.write_all(b"evil").expect("write malicious entry");
zip.finish().expect("finish malicious zip");
}
let denied = client
.call(
archive,
MethodId::new(2),
Value::Array(vec![
Value::from(zip_path.to_string_lossy().as_ref()),
Value::from(dir.path().join("out").to_string_lossy().as_ref()),
Value::Boolean(false),
]),
)
.await
.expect_err("zip slip entry should be rejected");
assert_eq!(denied.code, RuntimeErrorCode::InternalRuntimeError);
assert!(denied.message.contains("not enclosed"));
assert!(!dir.path().join("evil.txt").exists());
task.abort();
}
#[tokio::test]
async fn configured_policy_limits_archive_paths() {
let allowed = tempdir().expect("allowed temp dir");
let denied = tempdir().expect("denied temp dir");
let server = build_native_rpc_server_with_config(
NativePolicyConfig::default().allow_fs_read_write(allowed.path()),
);
let (client, task) = connect_with_server(server).await;
let ids = client
.resolve_instance_ids(vec![ARCHIVE_INSTANCE.to_string()])
.await
.expect("resolve archive instance");
let archive = InstanceId::new(ids[0]).expect("archive id");
let source = allowed.path().join("source");
std::fs::create_dir_all(&source).expect("create source");
std::fs::write(source.join("ok.txt"), b"ok").expect("write source");
let denied_archive = client
.call(
archive,
MethodId::new(1),
Value::Array(vec![
Value::from(source.to_string_lossy().as_ref()),
Value::from(denied.path().join("out.zip").to_string_lossy().as_ref()),
Value::Boolean(false),
Value::Boolean(true),
]),
)
.await
.expect_err("zip archive path outside allowlist should fail");
assert_eq!(denied_archive.code, RuntimeErrorCode::AccessDenied);
assert!(denied_archive.message.contains("archive_zip_write"));
task.abort();
}
#[tokio::test]
async fn configured_policy_limits_network_rules() {
let server = build_native_rpc_server_with_config(NativePolicyConfig::default());
let (client, task) = connect_with_server(server).await;
let ids = client
.resolve_instance_ids(vec![
TCP_INSTANCE.to_string(),
WEBSOCKET_INSTANCE.to_string(),
])
.await
.expect("resolve network instances");
let tcp = InstanceId::new(ids[0]).expect("tcp id");
let websocket = InstanceId::new(ids[1]).expect("websocket id");
let tcp_denied = client
.call(
tcp,
MethodId::new(5),
Value::Array(vec![Value::from("127.0.0.1"), Value::from(0_u64)]),
)
.await
.expect_err("tcp listen without allowlist should fail");
assert_eq!(tcp_denied.code, RuntimeErrorCode::AccessDenied);
assert!(tcp_denied.message.contains("tcp_listen"));
let ws_denied = client
.call(
websocket,
MethodId::new(1),
Value::Array(vec![Value::from("ws://127.0.0.1:65500")]),
)
.await
.expect_err("websocket connect without allowlist should fail");
assert_eq!(ws_denied.code, RuntimeErrorCode::AccessDenied);
assert!(ws_denied.message.contains("websocket_connect"));
task.abort();
let server = build_native_rpc_server_with_config(
NativePolicyConfig::default().allow_tcp("127.0.0.1", None),
);
let (client, task) = connect_with_server(server).await;
let ids = client
.resolve_instance_ids(vec![TCP_INSTANCE.to_string()])
.await
.expect("resolve tcp instance");
let tcp = InstanceId::new(ids[0]).expect("tcp id");
let listen = client
.call(
tcp,
MethodId::new(5),
Value::Array(vec![Value::from("127.0.0.1"), Value::from(0_u64)]),
)
.await
.expect("tcp listen should be allowed");
let server_id = match listen {
Value::Array(fields) => match &fields[0] {
Value::String(value) => value.as_str().expect("server id").to_string(),
other => panic!("unexpected server id field: {other:?}"),
},
other => panic!("unexpected tcp listen result: {other:?}"),
};
client
.call(
tcp,
MethodId::new(6),
Value::Array(vec![Value::from(server_id.as_str())]),
)
.await
.expect("close tcp server");
task.abort();
}
#[tokio::test]
async fn configured_policy_limits_sqlite_paths() {
let allowed = tempdir().expect("allowed temp dir");
let denied = tempdir().expect("denied temp dir");
let server = build_native_rpc_server_with_config(
NativePolicyConfig::default().allow_sqlite(allowed.path()),
);
let (client, task) = connect_with_server(server).await;
let ids = client
.resolve_instance_ids(vec![SQLITE_INSTANCE.to_string()])
.await
.expect("resolve sqlite instance");
let sqlite = InstanceId::new(ids[0]).expect("sqlite id");
client
.call(
sqlite,
MethodId::new(1),
Value::Array(vec![Value::from(
allowed.path().join("ok.db").to_string_lossy().as_ref(),
)]),
)
.await
.expect("open allowed sqlite db");
let denied = client
.call(
sqlite,
MethodId::new(1),
Value::Array(vec![Value::from(
denied.path().join("no.db").to_string_lossy().as_ref(),
)]),
)
.await
.expect_err("sqlite outside allowlist should fail");
assert_eq!(denied.code, RuntimeErrorCode::AccessDenied);
assert!(denied.message.contains("sqlite"));
task.abort();
}
#[tokio::test]
async fn configured_policy_denies_power_by_default() {
let server = build_native_rpc_server_with_config(NativePolicyConfig::default());
let (client, task) = connect_with_server(server).await;
let ids = client
.resolve_instance_ids(vec![
RUNTIME_INSTANCE.to_string(),
SYSTEM_INSTANCE.to_string(),
])
.await
.expect("resolve native instances");
let runtime = InstanceId::new(ids[0]).expect("runtime id");
let system = InstanceId::new(ids[1]).expect("system id");
let info = client
.call(runtime, MethodId::new(1), Value::Array(vec![]))
.await
.expect("get runtime info");
let Value::Array(info) = info else {
panic!("unexpected runtime info payload");
};
assert_eq!(info[5], Value::from("configured"));
let denied = client
.call(
system,
MethodId::new(2),
Value::Array(vec![Value::from(0_u64)]),
)
.await
.expect_err("shutdown should be denied by default");
assert_eq!(denied.code, RuntimeErrorCode::AccessDenied);
assert!(denied.message.contains("shutdown"));
let policy = ConfiguredNativePolicy::new(NativePolicyConfig::default().allow_shutdown());
policy
.allow_power("shutdown")
.expect("explicit shutdown policy should allow policy check");
task.abort();
}
#[tokio::test]
async fn tcp_and_websocket_servers_emit_events() {
let (client, server_task) = connect().await;
let ids = client
.resolve_instance_ids(vec![
TCP_INSTANCE.to_string(),
WEBSOCKET_INSTANCE.to_string(),
])
.await
.expect("resolve network instances");
let tcp = InstanceId::new(ids[0]).expect("tcp id");
let websocket = InstanceId::new(ids[1]).expect("websocket id");
let mut tcp_events = client.subscribe_notifications(Some(tcp), Some(EVENT_NOTIFICATION_ID));
let tcp_listen = client
.call(
tcp,
MethodId::new(5),
Value::Array(vec![Value::from("127.0.0.1"), Value::from(0_u64)]),
)
.await
.expect("tcp listen");
let tcp_addr = listen_endpoint(tcp_listen);
let mut stream = TcpStream::connect(&tcp_addr).await.expect("tcp connect");
stream.write_all(b"ping").await.expect("tcp write");
let tcp_event = recv_event_kind(&mut tcp_events, "data").await;
assert_eq!(tcp_event[3], Value::Binary(b"ping".to_vec()));
let mut ws_events =
client.subscribe_notifications(Some(websocket), Some(EVENT_NOTIFICATION_ID));
let ws_listen = client
.call(
websocket,
MethodId::new(5),
Value::Array(vec![Value::from("127.0.0.1"), Value::from(0_u64)]),
)
.await
.expect("websocket listen");
let ws_url = listen_endpoint(ws_listen);
let (mut ws, _) = tokio_tungstenite::connect_async(ws_url)
.await
.expect("websocket connect");
ws.send(Message::Text("hello".into()))
.await
.expect("websocket send");
let ws_event = recv_event_kind(&mut ws_events, "text").await;
assert_eq!(ws_event[4], Value::from("hello"));
client.goodbye("done").await.expect("goodbye");
server_task.abort();
}
struct DenyFsPolicy;
impl NativePolicy for DenyFsPolicy {
fn mode(&self) -> &'static str {
"deny-fs-test"
}
fn allow_fs_path(
&self,
_operation: &str,
_path: &std::path::Path,
) -> Result<(), rpc_runtime_errors::RuntimeError> {
Err(rpc_runtime_errors::RuntimeError::runtime(
rpc_runtime_errors::RuntimeErrorCode::AccessDenied,
"filesystem access denied by test policy",
))
}
}
fn sqlite_text(value: &str) -> Value {
Value::Array(vec![
Value::from("text"),
Value::Nil,
Value::Nil,
Value::from(value),
Value::Nil,
Value::Nil,
])
}
fn sqlite_integer(value: i64) -> Value {
Value::Array(vec![
Value::from("integer"),
Value::from(value),
Value::Nil,
Value::Nil,
Value::Nil,
Value::Nil,
])
}
fn listen_endpoint(value: Value) -> String {
match value {
Value::Array(fields) => match &fields[1] {
Value::String(value) => value.as_str().expect("endpoint").to_string(),
other => panic!("unexpected endpoint field: {other:?}"),
},
other => panic!("unexpected listen result: {other:?}"),
}
}
fn string_values(value: Value) -> Vec<String> {
match value {
Value::Array(values) => values
.into_iter()
.map(|value| match value {
Value::String(value) => value.as_str().expect("string").to_string(),
other => panic!("unexpected string list item: {other:?}"),
})
.collect(),
other => panic!("unexpected string list: {other:?}"),
}
}
fn string_field(value: &Value, index: usize) -> String {
match value {
Value::Array(fields) => match &fields[index] {
Value::String(value) => value.as_str().expect("string field").to_string(),
other => panic!("unexpected string field: {other:?}"),
},
other => panic!("unexpected array payload: {other:?}"),
}
}
fn u64_field(value: &Value, index: usize) -> u64 {
match value {
Value::Array(fields) => match &fields[index] {
Value::Integer(value) => value.as_u64().expect("u64 field"),
other => panic!("unexpected u64 field: {other:?}"),
},
other => panic!("unexpected array payload: {other:?}"),
}
}
async fn recv_event_kind(
receiver: &mut rpc_runtime_client::RpcNotificationReceiver,
expected_kind: &str,
) -> Vec<Value> {
for _ in 0..10 {
let notification = tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv())
.await
.expect("event timeout")
.expect("event");
let Value::Array(fields) = notification.payload else {
panic!("unexpected event payload");
};
if matches!(&fields[0], Value::String(kind) if kind.as_str() == Some(expected_kind)) {
return fields;
}
}
panic!("did not receive event kind {expected_kind}");
}
async fn connect() -> (
RpcClient,
tokio::task::JoinHandle<Result<(), rpc_runtime_errors::RuntimeError>>,
) {
connect_with_server(build_native_rpc_server()).await
}
async fn connect_with_server(
server: rpc_runtime_server::RpcServer,
) -> (
RpcClient,
tokio::task::JoinHandle<Result<(), rpc_runtime_errors::RuntimeError>>,
) {
let listener = IpcListener::bind(
IpcEndpoint::tcp("127.0.0.1:0".parse::<SocketAddr>().expect("addr")),
FrameConfig::default(),
)
.await
.expect("bind native test server");
let addr = listener.local_addr().expect("local addr");
let task = server.spawn_listener(listener);
let client = RpcClient::connect(IpcEndpoint::tcp(addr), FrameConfig::default())
.await
.expect("connect native client");
(client, task)
}