forge_worker_sdk/
server.rs1use std::sync::Arc;
21
22use futures::{SinkExt, StreamExt};
23use tokio::io::{AsyncRead, AsyncWrite};
24use tokio::sync::mpsc;
25use tokio_util::codec::Framed;
26use tracing::{error, info, warn};
27
28use crate::dispatcher::{BaseDispatcher, WorkerHandler};
29use crate::framing::{Encoding, Frame, FrameCodec};
30use crate::protocol::WireEvent;
31
32pub async fn run_worker<H: WorkerHandler>(
39 socket_path: &str,
40 handler: H,
41 encoding: Encoding,
42) -> anyhow::Result<()> {
43 let dispatcher = Arc::new(BaseDispatcher::new(handler, encoding));
44
45 let gc_reg = dispatcher.registry.clone();
47 tokio::spawn(async move {
48 let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
49 loop {
50 tick.tick().await;
51 gc_reg.gc(50);
52 }
53 });
54
55 serve(socket_path, dispatcher, encoding).await
56}
57
58async fn serve<H: WorkerHandler>(
61 socket_path: &str,
62 dispatcher: Arc<BaseDispatcher<H>>,
63 encoding: Encoding,
64) -> anyhow::Result<()> {
65 #[cfg(unix)]
66 return serve_unix(socket_path, dispatcher, encoding).await;
67
68 #[cfg(windows)]
69 return serve_windows(socket_path, dispatcher, encoding).await;
70}
71
72#[cfg(unix)]
75async fn serve_unix<H: WorkerHandler>(
76 socket_path: &str,
77 dispatcher: Arc<BaseDispatcher<H>>,
78 encoding: Encoding,
79) -> anyhow::Result<()> {
80 use interprocess::local_socket::tokio::prelude::*;
81 use interprocess::local_socket::{GenericFilePath, ListenerOptions, ToFsName};
82
83 let _ = std::fs::remove_file(socket_path);
84 let name = socket_path.to_fs_name::<GenericFilePath>()?;
85 let listener = ListenerOptions::new().name(name).create_tokio()?;
86 info!(path = socket_path, "forge worker listening on Unix socket");
87
88 loop {
89 match listener.accept().await {
90 Ok(conn) => {
91 let d = dispatcher.clone();
92 tokio::spawn(async move {
93 info!("client connected");
94 let (rx, tx) = conn.split();
95 handle_connection(rx, tx, d, encoding).await;
96 info!("client disconnected");
97 });
98 }
99 Err(e) => error!("accept error: {e}"),
100 }
101 }
102}
103
104#[cfg(windows)]
107async fn serve_windows<H: WorkerHandler>(
108 pipe_name: &str,
109 dispatcher: Arc<BaseDispatcher<H>>,
110 encoding: Encoding,
111) -> anyhow::Result<()> {
112 use interprocess::os::windows::named_pipe::pipe_mode;
113 use interprocess::os::windows::named_pipe::{PipeListenerOptions, PipeMode};
114
115 let listener = PipeListenerOptions::new()
116 .path(pipe_name)
117 .mode(PipeMode::Bytes)
118 .create_tokio_duplex::<pipe_mode::Bytes>()?;
119 info!(pipe = pipe_name, "forge worker listening on Windows named pipe");
120
121 loop {
122 match listener.accept().await {
123 Ok(conn) => {
124 let d = dispatcher.clone();
125 tokio::spawn(async move {
126 let (rx, tx) = tokio::io::split(conn);
127 handle_connection(rx, tx, d, encoding).await;
128 });
129 }
130 Err(e) => error!("accept error: {e}"),
131 }
132 }
133}
134
135async fn handle_connection<R, W, H>(
143 reader: R,
144 writer: W,
145 dispatcher: Arc<BaseDispatcher<H>>,
146 encoding: Encoding,
147)
148where
149 R: AsyncRead + Unpin + Send + 'static,
150 W: AsyncWrite + Unpin + Send + 'static,
151 H: WorkerHandler,
152{
153 let (out_tx, mut out_rx) = mpsc::unbounded_channel::<Frame>();
155 let (event_tx, mut event_rx) = mpsc::unbounded_channel::<WireEvent>();
157
158 let out_tx_evt = out_tx.clone();
160 tokio::spawn(async move {
161 while let Some(ev) = event_rx.recv().await {
162 if out_tx_evt.send(Frame::Event(ev)).is_err() {
163 break;
164 }
165 }
166 });
167
168 let mut sink = Framed::new(writer, FrameCodec::new(encoding));
170 let write_handle = tokio::spawn(async move {
171 while let Some(frame) = out_rx.recv().await {
172 if let Err(e) = sink.send(frame).await {
173 warn!("write error: {e}");
174 break;
175 }
176 }
177 });
178
179 let mut stream = Framed::new(reader, FrameCodec::new(encoding));
181 while let Some(result) = stream.next().await {
182 match result {
183 Ok(Frame::Request(req)) => {
184 let d = dispatcher.clone();
185 let evt_tx = event_tx.clone();
186 let resp_tx = out_tx.clone();
187 tokio::spawn(async move {
188 let resp = d.dispatch(req, evt_tx).await;
189 let _ = resp_tx.send(Frame::Response(resp));
190 });
191 }
192 Ok(_) => warn!("unexpected frame kind received from client — ignoring"),
193 Err(e) => {
194 error!("frame decode error: {e}");
195 break;
196 }
197 }
198 }
199
200 write_handle.abort();
201}