Skip to main content

tripley_native_node_addon/
lib.rs

1use std::sync::Arc;
2
3use napi::bindgen_prelude::*;
4use napi_derive::napi;
5use rpc_runtime_transport::{AddonConfig, AddonConnection, AddonEndpoint, TransportError};
6use tokio::sync::{Mutex, mpsc};
7use tripley_native_core::{
8    NativePolicyConfig, build_native_rpc_server, build_native_rpc_server_with_config,
9};
10
11#[napi]
12pub struct TripleyNativeServer {
13    endpoint: AddonEndpoint,
14    outbound: Arc<Mutex<mpsc::UnboundedReceiver<Vec<u8>>>>,
15    task: Mutex<Option<tokio::task::JoinHandle<()>>>,
16}
17
18#[napi]
19impl TripleyNativeServer {
20    #[napi]
21    pub async fn send_frame(&self, frame: Buffer) -> Result<()> {
22        self.endpoint
23            .receive_client_frame(frame.to_vec())
24            .await
25            .map_err(to_napi_error)
26    }
27
28    #[napi]
29    pub async fn next_frame(&self) -> Result<Option<Buffer>> {
30        let mut outbound = self.outbound.lock().await;
31        Ok(outbound.recv().await.map(Buffer::from))
32    }
33
34    #[napi]
35    pub async fn close(&self) -> Result<()> {
36        self.endpoint
37            .close_client_input()
38            .await
39            .map_err(to_napi_error)?;
40        if let Some(task) = self.task.lock().await.take() {
41            task.abort();
42        }
43        Ok(())
44    }
45}
46
47#[napi]
48pub async fn create_tripley_native_server() -> TripleyNativeServer {
49    create_server(build_native_rpc_server())
50}
51
52#[napi]
53pub async fn create_tripley_native_server_with_policy_config(
54    policy_config_json: String,
55) -> Result<TripleyNativeServer> {
56    let config =
57        NativePolicyConfig::from_json(&policy_config_json).map_err(to_napi_runtime_error)?;
58    Ok(create_server(build_native_rpc_server_with_config(config)))
59}
60
61fn create_server(server: rpc_runtime_server::RpcServer) -> TripleyNativeServer {
62    let (outbound_tx, outbound_rx) = mpsc::unbounded_channel();
63    let (connection, endpoint) = AddonConnection::new(AddonConfig::default(), move |frame| {
64        let outbound_tx = outbound_tx.clone();
65        async move {
66            outbound_tx.send(frame).map_err(|_| {
67                TransportError::Io(std::io::Error::new(
68                    std::io::ErrorKind::BrokenPipe,
69                    "Tripley Native addon outbound frame receiver is closed",
70                ))
71            })
72        }
73    });
74
75    let task = tokio::spawn(async move {
76        if let Err(error) = server.serve_connection(connection).await {
77            eprintln!("Tripley Native Node addon server failed: {error}");
78        }
79    });
80
81    TripleyNativeServer {
82        endpoint,
83        outbound: Arc::new(Mutex::new(outbound_rx)),
84        task: Mutex::new(Some(task)),
85    }
86}
87
88fn to_napi_error(error: TransportError) -> Error {
89    Error::new(Status::GenericFailure, error.to_string())
90}
91
92fn to_napi_runtime_error(error: rpc_runtime_errors::RuntimeError) -> Error {
93    Error::new(Status::InvalidArg, error.to_string())
94}