tripley_native_node_addon/
lib.rs1use std::sync::Arc;
2
3use napi::bindgen_prelude::*;
4use napi_derive::napi;
5use rpc_runtime_transport::{AddonConfig, AddonConnection, AddonEndpoint, TransportError};
6use tokio::sync::{Mutex, mpsc};
7use tripley_native_core::{
8 NativePolicyConfig, build_native_rpc_server, build_native_rpc_server_with_config,
9};
10
11#[napi]
12pub struct TripleyNativeServer {
13 endpoint: AddonEndpoint,
14 outbound: Arc<Mutex<mpsc::UnboundedReceiver<Vec<u8>>>>,
15 task: Mutex<Option<tokio::task::JoinHandle<()>>>,
16}
17
18#[napi]
19impl TripleyNativeServer {
20 #[napi]
21 pub async fn send_frame(&self, frame: Buffer) -> Result<()> {
22 self.endpoint
23 .receive_client_frame(frame.to_vec())
24 .await
25 .map_err(to_napi_error)
26 }
27
28 #[napi]
29 pub async fn next_frame(&self) -> Result<Option<Buffer>> {
30 let mut outbound = self.outbound.lock().await;
31 Ok(outbound.recv().await.map(Buffer::from))
32 }
33
34 #[napi]
35 pub async fn close(&self) -> Result<()> {
36 self.endpoint
37 .close_client_input()
38 .await
39 .map_err(to_napi_error)?;
40 if let Some(task) = self.task.lock().await.take() {
41 task.abort();
42 }
43 Ok(())
44 }
45}
46
47#[napi]
48pub async fn create_tripley_native_server() -> TripleyNativeServer {
49 create_server(build_native_rpc_server())
50}
51
52#[napi]
53pub async fn create_tripley_native_server_with_policy_config(
54 policy_config_json: String,
55) -> Result<TripleyNativeServer> {
56 let config =
57 NativePolicyConfig::from_json(&policy_config_json).map_err(to_napi_runtime_error)?;
58 Ok(create_server(build_native_rpc_server_with_config(config)))
59}
60
61fn create_server(server: rpc_runtime_server::RpcServer) -> TripleyNativeServer {
62 let (outbound_tx, outbound_rx) = mpsc::unbounded_channel();
63 let (connection, endpoint) = AddonConnection::new(AddonConfig::default(), move |frame| {
64 let outbound_tx = outbound_tx.clone();
65 async move {
66 outbound_tx.send(frame).map_err(|_| {
67 TransportError::Io(std::io::Error::new(
68 std::io::ErrorKind::BrokenPipe,
69 "Tripley Native addon outbound frame receiver is closed",
70 ))
71 })
72 }
73 });
74
75 let task = tokio::spawn(async move {
76 if let Err(error) = server.serve_connection(connection).await {
77 eprintln!("Tripley Native Node addon server failed: {error}");
78 }
79 });
80
81 TripleyNativeServer {
82 endpoint,
83 outbound: Arc::new(Mutex::new(outbound_rx)),
84 task: Mutex::new(Some(task)),
85 }
86}
87
88fn to_napi_error(error: TransportError) -> Error {
89 Error::new(Status::GenericFailure, error.to_string())
90}
91
92fn to_napi_runtime_error(error: rpc_runtime_errors::RuntimeError) -> Error {
93 Error::new(Status::InvalidArg, error.to_string())
94}