cli/tunnels/
singleton_server.rs

1/*---------------------------------------------------------------------------------------------
2 *  Copyright (c) Microsoft Corporation. All rights reserved.
3 *  Licensed under the MIT License. See License.txt in the project root for license information.
4 *--------------------------------------------------------------------------------------------*/
5
6use std::{
7	pin::Pin,
8	sync::{Arc, Mutex},
9};
10
11use super::{
12	code_server::CodeServerArgs,
13	control_server::ServerTermination,
14	dev_tunnels::{ActiveTunnel, StatusLock},
15	protocol,
16	shutdown_signal::{ShutdownRequest, ShutdownSignal},
17};
18use crate::{
19	async_pipe::socket_stream_split,
20	json_rpc::{new_json_rpc, start_json_rpc, JsonRpcSerializer},
21	log,
22	rpc::{RpcCaller, RpcDispatcher},
23	singleton::SingletonServer,
24	state::LauncherPaths,
25	tunnels::code_server::print_listening,
26	update_service::Platform,
27	util::{
28		errors::{AnyError, CodeError},
29		ring_buffer::RingBuffer,
30		sync::{Barrier, ConcatReceivable},
31	},
32};
33use futures::future::Either;
34use tokio::{
35	pin,
36	sync::{broadcast, mpsc},
37	task::JoinHandle,
38};
39
40pub struct SingletonServerArgs<'a> {
41	pub server: &'a mut RpcServer,
42	pub log: log::Logger,
43	pub tunnel: ActiveTunnel,
44	pub paths: &'a LauncherPaths,
45	pub code_server_args: &'a CodeServerArgs,
46	pub platform: Platform,
47	pub shutdown: Barrier<ShutdownSignal>,
48	pub log_broadcast: &'a BroadcastLogSink,
49}
50
51struct StatusInfo {
52	name: String,
53	lock: StatusLock,
54}
55
56#[derive(Clone)]
57struct SingletonServerContext {
58	log: log::Logger,
59	shutdown_tx: broadcast::Sender<ShutdownSignal>,
60	broadcast_tx: broadcast::Sender<Vec<u8>>,
61	// ugly: a lock in a lock. current_status needs to be provided only
62	// after we set up the tunnel, however the tunnel is created after the
63	// singleton server starts to avoid a gap in singleton availability.
64	// However, this should be safe, as the lock is only used for immediate
65	// data reads (in the `status` method).
66	current_status: Arc<Mutex<Option<StatusInfo>>>,
67}
68
69pub struct RpcServer {
70	fut: JoinHandle<Result<(), CodeError>>,
71	shutdown_broadcast: broadcast::Sender<ShutdownSignal>,
72	current_status: Arc<Mutex<Option<StatusInfo>>>,
73}
74
75pub fn make_singleton_server(
76	log_broadcast: BroadcastLogSink,
77	log: log::Logger,
78	server: SingletonServer,
79	shutdown_rx: Barrier<ShutdownSignal>,
80) -> RpcServer {
81	let (shutdown_broadcast, _) = broadcast::channel(4);
82	let rpc = new_json_rpc();
83
84	let current_status = Arc::new(Mutex::default());
85	let mut rpc = rpc.methods(SingletonServerContext {
86		log: log.clone(),
87		shutdown_tx: shutdown_broadcast.clone(),
88		broadcast_tx: log_broadcast.get_brocaster(),
89		current_status: current_status.clone(),
90	});
91
92	rpc.register_sync(
93		protocol::singleton::METHOD_RESTART,
94		|_: protocol::EmptyObject, ctx| {
95			info!(ctx.log, "restarting tunnel after client request");
96			let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcRestartRequested);
97			Ok(())
98		},
99	);
100
101	rpc.register_sync(
102		protocol::singleton::METHOD_STATUS,
103		|_: protocol::EmptyObject, c| {
104			Ok(c.current_status
105				.lock()
106				.unwrap()
107				.as_ref()
108				.map(|s| protocol::singleton::StatusWithTunnelName {
109					name: Some(s.name.clone()),
110					status: s.lock.read(),
111				})
112				.unwrap_or_default())
113		},
114	);
115
116	rpc.register_sync(
117		protocol::singleton::METHOD_SHUTDOWN,
118		|_: protocol::EmptyObject, ctx| {
119			info!(
120				ctx.log,
121				"closing tunnel and all clients after a shutdown request"
122			);
123			let _ = ctx.broadcast_tx.send(RpcCaller::serialize_notify(
124				&JsonRpcSerializer {},
125				protocol::singleton::METHOD_SHUTDOWN,
126				protocol::EmptyObject {},
127			));
128			let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcShutdownRequested);
129			Ok(())
130		},
131	);
132
133	// we tokio spawn instead of keeping a future, since we want it to progress
134	// even outside of the start_singleton_server loop (i.e. while the tunnel restarts)
135	let fut = tokio::spawn(async move {
136		serve_singleton_rpc(log_broadcast, server, rpc.build(log), shutdown_rx).await
137	});
138	RpcServer {
139		shutdown_broadcast,
140		current_status,
141		fut,
142	}
143}
144
145pub async fn start_singleton_server<'a>(
146	args: SingletonServerArgs<'_>,
147) -> Result<ServerTermination, AnyError> {
148	let shutdown_rx = ShutdownRequest::create_rx([
149		ShutdownRequest::Derived(Box::new(args.server.shutdown_broadcast.subscribe())),
150		ShutdownRequest::Derived(Box::new(args.shutdown.clone())),
151	]);
152
153	{
154		print_listening(&args.log, &args.tunnel.name);
155		let mut status = args.server.current_status.lock().unwrap();
156		*status = Some(StatusInfo {
157			name: args.tunnel.name.clone(),
158			lock: args.tunnel.status(),
159		})
160	}
161
162	let serve_fut = super::serve(
163		&args.log,
164		args.tunnel,
165		args.paths,
166		args.code_server_args,
167		args.platform,
168		shutdown_rx,
169	);
170
171	pin!(serve_fut);
172
173	match futures::future::select(Pin::new(&mut args.server.fut), &mut serve_fut).await {
174		Either::Left((rpc_result, fut)) => {
175			// the rpc server will only end as a result of a graceful shutdown, or
176			// with an error. Return the result of the eventual shutdown of the
177			// control server.
178			rpc_result.unwrap()?;
179			fut.await
180		}
181		Either::Right((ctrl_result, _)) => ctrl_result,
182	}
183}
184
185async fn serve_singleton_rpc<C: Clone + Send + Sync + 'static>(
186	log_broadcast: BroadcastLogSink,
187	mut server: SingletonServer,
188	dispatcher: RpcDispatcher<JsonRpcSerializer, C>,
189	shutdown_rx: Barrier<ShutdownSignal>,
190) -> Result<(), CodeError> {
191	let mut own_shutdown = shutdown_rx.clone();
192	let shutdown_fut = own_shutdown.wait();
193	pin!(shutdown_fut);
194
195	loop {
196		let cnx = tokio::select! {
197			c = server.accept() => c?,
198			_ = &mut shutdown_fut => return Ok(()),
199		};
200
201		let (read, write) = socket_stream_split(cnx);
202		let dispatcher = dispatcher.clone();
203		let msg_rx = log_broadcast.replay_and_subscribe();
204		let shutdown_rx = shutdown_rx.clone();
205		tokio::spawn(async move {
206			let _ = start_json_rpc(dispatcher.clone(), read, write, msg_rx, shutdown_rx).await;
207		});
208	}
209}
210
211/// Log sink that can broadcast and replay log events. Used for transmitting
212/// logs from the singleton to all clients. This should be created and injected
213/// into other services, like the tunnel, before `start_singleton_server`
214/// is called.
215#[derive(Clone)]
216pub struct BroadcastLogSink {
217	recent: Arc<Mutex<RingBuffer<Vec<u8>>>>,
218	tx: broadcast::Sender<Vec<u8>>,
219}
220
221impl Default for BroadcastLogSink {
222	fn default() -> Self {
223		Self::new()
224	}
225}
226
227impl BroadcastLogSink {
228	pub fn new() -> Self {
229		let (tx, _) = broadcast::channel(64);
230		Self {
231			tx,
232			recent: Arc::new(Mutex::new(RingBuffer::new(50))),
233		}
234	}
235
236	pub fn get_brocaster(&self) -> broadcast::Sender<Vec<u8>> {
237		self.tx.clone()
238	}
239
240	fn replay_and_subscribe(
241		&self,
242	) -> ConcatReceivable<Vec<u8>, mpsc::UnboundedReceiver<Vec<u8>>, broadcast::Receiver<Vec<u8>>> {
243		let (log_replay_tx, log_replay_rx) = mpsc::unbounded_channel();
244
245		for log in self.recent.lock().unwrap().iter() {
246			let _ = log_replay_tx.send(log.clone());
247		}
248
249		let _ = log_replay_tx.send(RpcCaller::serialize_notify(
250			&JsonRpcSerializer {},
251			protocol::singleton::METHOD_LOG_REPLY_DONE,
252			protocol::EmptyObject {},
253		));
254
255		ConcatReceivable::new(log_replay_rx, self.tx.subscribe())
256	}
257}
258
259impl log::LogSink for BroadcastLogSink {
260	fn write_log(&self, level: log::Level, prefix: &str, message: &str) {
261		let s = JsonRpcSerializer {};
262		let serialized = RpcCaller::serialize_notify(
263			&s,
264			protocol::singleton::METHOD_LOG,
265			protocol::singleton::LogMessage {
266				level: Some(level),
267				prefix,
268				message,
269			},
270		);
271
272		let _ = self.tx.send(serialized.clone());
273		self.recent.lock().unwrap().push(serialized);
274	}
275
276	fn write_result(&self, message: &str) {
277		self.write_log(log::Level::Info, "", message);
278	}
279}