cli/tunnels/
singleton_client.rs

1/*---------------------------------------------------------------------------------------------
2 *  Copyright (c) Microsoft Corporation. All rights reserved.
3 *  Licensed under the MIT License. See License.txt in the project root for license information.
4 *--------------------------------------------------------------------------------------------*/
5
6use std::{
7	path::Path,
8	sync::{
9		atomic::{AtomicBool, Ordering},
10		Arc,
11	},
12	thread,
13};
14
15use const_format::concatcp;
16use tokio::sync::mpsc;
17
18use crate::{
19	async_pipe::{socket_stream_split, AsyncPipe},
20	constants::IS_INTERACTIVE_CLI,
21	json_rpc::{new_json_rpc, start_json_rpc, JsonRpcSerializer},
22	log,
23	rpc::RpcCaller,
24	singleton::connect_as_client,
25	tunnels::{code_server::print_listening, protocol::EmptyObject},
26	util::{errors::CodeError, sync::Barrier},
27};
28
29use super::{
30	protocol,
31	shutdown_signal::{ShutdownRequest, ShutdownSignal},
32};
33
34pub struct SingletonClientArgs {
35	pub log: log::Logger,
36	pub stream: AsyncPipe,
37	pub shutdown: Barrier<ShutdownSignal>,
38}
39
40struct SingletonServerContext {
41	log: log::Logger,
42	exit_entirely: Arc<AtomicBool>,
43	caller: RpcCaller<JsonRpcSerializer>,
44}
45
46const CONTROL_INSTRUCTIONS_COMMON: &str =
47	"Connected to an existing tunnel process running on this machine.";
48
49const CONTROL_INSTRUCTIONS_INTERACTIVE: &str = concatcp!(
50	CONTROL_INSTRUCTIONS_COMMON,
51	" You can press:
52
53- \"x\" + Enter to stop the tunnel and exit
54- \"r\" + Enter to restart the tunnel
55- Ctrl+C to detach
56"
57);
58
59/// Serves a client singleton. Returns true if the process should exit after
60/// this returns, instead of trying to start a tunnel.
61pub async fn start_singleton_client(args: SingletonClientArgs) -> bool {
62	let mut rpc = new_json_rpc();
63	let (msg_tx, msg_rx) = mpsc::unbounded_channel();
64	let exit_entirely = Arc::new(AtomicBool::new(false));
65
66	debug!(
67		args.log,
68		"An existing tunnel is running on this machine, connecting to it..."
69	);
70
71	if *IS_INTERACTIVE_CLI {
72		let stdin_handle = rpc.get_caller(msg_tx.clone());
73		thread::spawn(move || {
74			let mut input = String::new();
75			loop {
76				input.truncate(0);
77				match std::io::stdin().read_line(&mut input) {
78					Err(_) | Ok(0) => return, // EOF or not a tty
79					_ => {}
80				};
81
82				match input.chars().next().map(|c| c.to_ascii_lowercase()) {
83					Some('x') => {
84						stdin_handle.notify(protocol::singleton::METHOD_SHUTDOWN, EmptyObject {});
85						return;
86					}
87					Some('r') => {
88						stdin_handle.notify(protocol::singleton::METHOD_RESTART, EmptyObject {});
89					}
90					Some(_) | None => {}
91				}
92			}
93		});
94	}
95
96	let caller = rpc.get_caller(msg_tx);
97	let mut rpc = rpc.methods(SingletonServerContext {
98		log: args.log.clone(),
99		exit_entirely: exit_entirely.clone(),
100		caller,
101	});
102
103	rpc.register_sync(protocol::singleton::METHOD_SHUTDOWN, |_: EmptyObject, c| {
104		c.exit_entirely.store(true, Ordering::SeqCst);
105		Ok(())
106	});
107
108	rpc.register_async(
109		protocol::singleton::METHOD_LOG_REPLY_DONE,
110		|_: EmptyObject, c| async move {
111			c.log.result(if *IS_INTERACTIVE_CLI {
112				CONTROL_INSTRUCTIONS_INTERACTIVE
113			} else {
114				CONTROL_INSTRUCTIONS_COMMON
115			});
116
117			let res = c
118				.caller
119				.call::<_, _, protocol::singleton::StatusWithTunnelName>(
120					protocol::singleton::METHOD_STATUS,
121					protocol::EmptyObject {},
122				);
123
124			// we want to ensure the "listening" string always gets printed for
125			// consumers (i.e. VS Code). Ask for it. If the tunnel is not currently
126			// connected though, it will be soon, and that'll be in the log replays.
127			if let Ok(Ok(s)) = res.await {
128				if let Some(name) = s.name {
129					print_listening(&c.log, &name);
130				}
131			}
132
133			Ok(())
134		},
135	);
136
137	rpc.register_sync(
138		protocol::singleton::METHOD_LOG,
139		|log: protocol::singleton::LogMessageOwned, c| {
140			match log.level {
141				Some(level) => c.log.emit(level, &format!("{}{}", log.prefix, log.message)),
142				None => c.log.result(format!("{}{}", log.prefix, log.message)),
143			}
144			Ok(())
145		},
146	);
147
148	let (read, write) = socket_stream_split(args.stream);
149	let _ = start_json_rpc(rpc.build(args.log), read, write, msg_rx, args.shutdown).await;
150
151	exit_entirely.load(Ordering::SeqCst)
152}
153
154pub async fn do_single_rpc_call<
155	P: serde::Serialize + 'static,
156	R: serde::de::DeserializeOwned + Send + 'static,
157>(
158	lock_file: &Path,
159	log: log::Logger,
160	method: &'static str,
161	params: P,
162) -> Result<R, CodeError> {
163	let client = match connect_as_client(lock_file).await {
164		Ok(p) => p,
165		Err(CodeError::SingletonLockfileOpenFailed(_))
166		| Err(CodeError::SingletonLockedProcessExited(_)) => {
167			return Err(CodeError::NoRunningTunnel);
168		}
169		Err(e) => return Err(e),
170	};
171
172	let (msg_tx, msg_rx) = mpsc::unbounded_channel();
173	let mut rpc = new_json_rpc();
174	let caller = rpc.get_caller(msg_tx);
175	let (read, write) = socket_stream_split(client);
176
177	let rpc = tokio::spawn(async move {
178		start_json_rpc(
179			rpc.methods(()).build(log),
180			read,
181			write,
182			msg_rx,
183			ShutdownRequest::create_rx([ShutdownRequest::CtrlC]),
184		)
185		.await
186		.unwrap();
187	});
188
189	let r = caller.call(method, params).await.unwrap();
190	rpc.abort();
191	r.map_err(CodeError::TunnelRpcCallFailed)
192}