use std::sync::Arc;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use rpc_runtime_transport::{AddonConfig, AddonConnection, AddonEndpoint, TransportError};
use tokio::sync::{Mutex, mpsc};
use tripley_native_core::{
NativePolicyConfig, build_native_rpc_server, build_native_rpc_server_with_config,
};
#[napi]
pub struct TripleyNativeServer {
endpoint: AddonEndpoint,
outbound: Arc<Mutex<mpsc::UnboundedReceiver<Vec<u8>>>>,
task: Mutex<Option<tokio::task::JoinHandle<()>>>,
}
#[napi]
impl TripleyNativeServer {
#[napi]
pub async fn send_frame(&self, frame: Buffer) -> Result<()> {
self.endpoint
.receive_client_frame(frame.to_vec())
.await
.map_err(to_napi_error)
}
#[napi]
pub async fn next_frame(&self) -> Result<Option<Buffer>> {
let mut outbound = self.outbound.lock().await;
Ok(outbound.recv().await.map(Buffer::from))
}
#[napi]
pub async fn close(&self) -> Result<()> {
self.endpoint
.close_client_input()
.await
.map_err(to_napi_error)?;
if let Some(task) = self.task.lock().await.take() {
task.abort();
}
Ok(())
}
}
#[napi]
pub async fn create_tripley_native_server() -> TripleyNativeServer {
create_server(build_native_rpc_server())
}
#[napi]
pub async fn create_tripley_native_server_with_policy_config(
policy_config_json: String,
) -> Result<TripleyNativeServer> {
let config =
NativePolicyConfig::from_json(&policy_config_json).map_err(to_napi_runtime_error)?;
Ok(create_server(build_native_rpc_server_with_config(config)))
}
fn create_server(server: rpc_runtime_server::RpcServer) -> TripleyNativeServer {
let (outbound_tx, outbound_rx) = mpsc::unbounded_channel();
let (connection, endpoint) = AddonConnection::new(AddonConfig::default(), move |frame| {
let outbound_tx = outbound_tx.clone();
async move {
outbound_tx.send(frame).map_err(|_| {
TransportError::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Tripley Native addon outbound frame receiver is closed",
))
})
}
});
let task = tokio::spawn(async move {
if let Err(error) = server.serve_connection(connection).await {
eprintln!("Tripley Native Node addon server failed: {error}");
}
});
TripleyNativeServer {
endpoint,
outbound: Arc::new(Mutex::new(outbound_rx)),
task: Mutex::new(Some(task)),
}
}
fn to_napi_error(error: TransportError) -> Error {
Error::new(Status::GenericFailure, error.to_string())
}
fn to_napi_runtime_error(error: rpc_runtime_errors::RuntimeError) -> Error {
Error::new(Status::InvalidArg, error.to_string())
}