1use std::{
7 pin::Pin,
8 sync::{Arc, Mutex},
9};
10
11use super::{
12 code_server::CodeServerArgs,
13 control_server::ServerTermination,
14 dev_tunnels::{ActiveTunnel, StatusLock},
15 protocol,
16 shutdown_signal::{ShutdownRequest, ShutdownSignal},
17};
18use crate::{
19 async_pipe::socket_stream_split,
20 json_rpc::{new_json_rpc, start_json_rpc, JsonRpcSerializer},
21 log,
22 rpc::{RpcCaller, RpcDispatcher},
23 singleton::SingletonServer,
24 state::LauncherPaths,
25 tunnels::code_server::print_listening,
26 update_service::Platform,
27 util::{
28 errors::{AnyError, CodeError},
29 ring_buffer::RingBuffer,
30 sync::{Barrier, ConcatReceivable},
31 },
32};
33use futures::future::Either;
34use tokio::{
35 pin,
36 sync::{broadcast, mpsc},
37 task::JoinHandle,
38};
39
40pub struct SingletonServerArgs<'a> {
41 pub server: &'a mut RpcServer,
42 pub log: log::Logger,
43 pub tunnel: ActiveTunnel,
44 pub paths: &'a LauncherPaths,
45 pub code_server_args: &'a CodeServerArgs,
46 pub platform: Platform,
47 pub shutdown: Barrier<ShutdownSignal>,
48 pub log_broadcast: &'a BroadcastLogSink,
49}
50
51struct StatusInfo {
52 name: String,
53 lock: StatusLock,
54}
55
56#[derive(Clone)]
57struct SingletonServerContext {
58 log: log::Logger,
59 shutdown_tx: broadcast::Sender<ShutdownSignal>,
60 broadcast_tx: broadcast::Sender<Vec<u8>>,
61 current_status: Arc<Mutex<Option<StatusInfo>>>,
67}
68
69pub struct RpcServer {
70 fut: JoinHandle<Result<(), CodeError>>,
71 shutdown_broadcast: broadcast::Sender<ShutdownSignal>,
72 current_status: Arc<Mutex<Option<StatusInfo>>>,
73}
74
75pub fn make_singleton_server(
76 log_broadcast: BroadcastLogSink,
77 log: log::Logger,
78 server: SingletonServer,
79 shutdown_rx: Barrier<ShutdownSignal>,
80) -> RpcServer {
81 let (shutdown_broadcast, _) = broadcast::channel(4);
82 let rpc = new_json_rpc();
83
84 let current_status = Arc::new(Mutex::default());
85 let mut rpc = rpc.methods(SingletonServerContext {
86 log: log.clone(),
87 shutdown_tx: shutdown_broadcast.clone(),
88 broadcast_tx: log_broadcast.get_brocaster(),
89 current_status: current_status.clone(),
90 });
91
92 rpc.register_sync(
93 protocol::singleton::METHOD_RESTART,
94 |_: protocol::EmptyObject, ctx| {
95 info!(ctx.log, "restarting tunnel after client request");
96 let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcRestartRequested);
97 Ok(())
98 },
99 );
100
101 rpc.register_sync(
102 protocol::singleton::METHOD_STATUS,
103 |_: protocol::EmptyObject, c| {
104 Ok(c.current_status
105 .lock()
106 .unwrap()
107 .as_ref()
108 .map(|s| protocol::singleton::StatusWithTunnelName {
109 name: Some(s.name.clone()),
110 status: s.lock.read(),
111 })
112 .unwrap_or_default())
113 },
114 );
115
116 rpc.register_sync(
117 protocol::singleton::METHOD_SHUTDOWN,
118 |_: protocol::EmptyObject, ctx| {
119 info!(
120 ctx.log,
121 "closing tunnel and all clients after a shutdown request"
122 );
123 let _ = ctx.broadcast_tx.send(RpcCaller::serialize_notify(
124 &JsonRpcSerializer {},
125 protocol::singleton::METHOD_SHUTDOWN,
126 protocol::EmptyObject {},
127 ));
128 let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcShutdownRequested);
129 Ok(())
130 },
131 );
132
133 let fut = tokio::spawn(async move {
136 serve_singleton_rpc(log_broadcast, server, rpc.build(log), shutdown_rx).await
137 });
138 RpcServer {
139 shutdown_broadcast,
140 current_status,
141 fut,
142 }
143}
144
145pub async fn start_singleton_server<'a>(
146 args: SingletonServerArgs<'_>,
147) -> Result<ServerTermination, AnyError> {
148 let shutdown_rx = ShutdownRequest::create_rx([
149 ShutdownRequest::Derived(Box::new(args.server.shutdown_broadcast.subscribe())),
150 ShutdownRequest::Derived(Box::new(args.shutdown.clone())),
151 ]);
152
153 {
154 print_listening(&args.log, &args.tunnel.name);
155 let mut status = args.server.current_status.lock().unwrap();
156 *status = Some(StatusInfo {
157 name: args.tunnel.name.clone(),
158 lock: args.tunnel.status(),
159 })
160 }
161
162 let serve_fut = super::serve(
163 &args.log,
164 args.tunnel,
165 args.paths,
166 args.code_server_args,
167 args.platform,
168 shutdown_rx,
169 );
170
171 pin!(serve_fut);
172
173 match futures::future::select(Pin::new(&mut args.server.fut), &mut serve_fut).await {
174 Either::Left((rpc_result, fut)) => {
175 rpc_result.unwrap()?;
179 fut.await
180 }
181 Either::Right((ctrl_result, _)) => ctrl_result,
182 }
183}
184
185async fn serve_singleton_rpc<C: Clone + Send + Sync + 'static>(
186 log_broadcast: BroadcastLogSink,
187 mut server: SingletonServer,
188 dispatcher: RpcDispatcher<JsonRpcSerializer, C>,
189 shutdown_rx: Barrier<ShutdownSignal>,
190) -> Result<(), CodeError> {
191 let mut own_shutdown = shutdown_rx.clone();
192 let shutdown_fut = own_shutdown.wait();
193 pin!(shutdown_fut);
194
195 loop {
196 let cnx = tokio::select! {
197 c = server.accept() => c?,
198 _ = &mut shutdown_fut => return Ok(()),
199 };
200
201 let (read, write) = socket_stream_split(cnx);
202 let dispatcher = dispatcher.clone();
203 let msg_rx = log_broadcast.replay_and_subscribe();
204 let shutdown_rx = shutdown_rx.clone();
205 tokio::spawn(async move {
206 let _ = start_json_rpc(dispatcher.clone(), read, write, msg_rx, shutdown_rx).await;
207 });
208 }
209}
210
211#[derive(Clone)]
216pub struct BroadcastLogSink {
217 recent: Arc<Mutex<RingBuffer<Vec<u8>>>>,
218 tx: broadcast::Sender<Vec<u8>>,
219}
220
221impl Default for BroadcastLogSink {
222 fn default() -> Self {
223 Self::new()
224 }
225}
226
227impl BroadcastLogSink {
228 pub fn new() -> Self {
229 let (tx, _) = broadcast::channel(64);
230 Self {
231 tx,
232 recent: Arc::new(Mutex::new(RingBuffer::new(50))),
233 }
234 }
235
236 pub fn get_brocaster(&self) -> broadcast::Sender<Vec<u8>> {
237 self.tx.clone()
238 }
239
240 fn replay_and_subscribe(
241 &self,
242 ) -> ConcatReceivable<Vec<u8>, mpsc::UnboundedReceiver<Vec<u8>>, broadcast::Receiver<Vec<u8>>> {
243 let (log_replay_tx, log_replay_rx) = mpsc::unbounded_channel();
244
245 for log in self.recent.lock().unwrap().iter() {
246 let _ = log_replay_tx.send(log.clone());
247 }
248
249 let _ = log_replay_tx.send(RpcCaller::serialize_notify(
250 &JsonRpcSerializer {},
251 protocol::singleton::METHOD_LOG_REPLY_DONE,
252 protocol::EmptyObject {},
253 ));
254
255 ConcatReceivable::new(log_replay_rx, self.tx.subscribe())
256 }
257}
258
259impl log::LogSink for BroadcastLogSink {
260 fn write_log(&self, level: log::Level, prefix: &str, message: &str) {
261 let s = JsonRpcSerializer {};
262 let serialized = RpcCaller::serialize_notify(
263 &s,
264 protocol::singleton::METHOD_LOG,
265 protocol::singleton::LogMessage {
266 level: Some(level),
267 prefix,
268 message,
269 },
270 );
271
272 let _ = self.tx.send(serialized.clone());
273 self.recent.lock().unwrap().push(serialized);
274 }
275
276 fn write_result(&self, message: &str) {
277 self.write_log(log::Level::Info, "", message);
278 }
279}