cli/commands/
tunnels.rs

1/*---------------------------------------------------------------------------------------------
2 *  Copyright (c) Microsoft Corporation. All rights reserved.
3 *  Licensed under the MIT License. See License.txt in the project root for license information.
4 *--------------------------------------------------------------------------------------------*/
5
6use async_trait::async_trait;
7use base64::{engine::general_purpose as b64, Engine as _};
8use futures::{stream::FuturesUnordered, StreamExt};
9use serde::Serialize;
10use sha2::{Digest, Sha256};
11use std::{
12	net::{IpAddr, Ipv4Addr, SocketAddr},
13	str::FromStr,
14	time::Duration,
15};
16use sysinfo::Pid;
17use tokio::{
18	io::{AsyncBufReadExt, BufReader},
19	sync::watch,
20};
21
22use super::{
23	args::{
24		AuthProvider, CliCore, CommandShellArgs, ExistingTunnelArgs, TunnelForwardArgs,
25		TunnelRenameArgs, TunnelServeArgs, TunnelServiceSubCommands, TunnelUserSubCommands,
26	},
27	CommandContext,
28};
29
30use crate::{
31	async_pipe::{get_socket_name, listen_socket_rw_stream, AsyncRWAccepter},
32	auth::Auth,
33	constants::{
34		APPLICATION_NAME, CONTROL_PORT, IS_A_TTY, TUNNEL_CLI_LOCK_NAME, TUNNEL_SERVICE_LOCK_NAME,
35	},
36	log,
37	state::LauncherPaths,
38	tunnels::{
39		code_server::CodeServerArgs,
40		create_service_manager,
41		dev_tunnels::{self, DevTunnels},
42		legal, local_forwarding,
43		paths::get_all_servers,
44		protocol, serve_stream,
45		shutdown_signal::ShutdownRequest,
46		singleton_client::do_single_rpc_call,
47		singleton_server::{
48			make_singleton_server, start_singleton_server, BroadcastLogSink, SingletonServerArgs,
49		},
50		AuthRequired, Next, ServeStreamParams, ServiceContainer, ServiceManager,
51	},
52	util::{
53		app_lock::AppMutex,
54		command::new_std_command,
55		errors::{wrap, AnyError, CodeError},
56		machine::canonical_exe,
57		prereqs::PreReqChecker,
58	},
59};
60use crate::{
61	singleton::{acquire_singleton, SingletonConnection},
62	tunnels::{
63		dev_tunnels::ActiveTunnel,
64		singleton_client::{start_singleton_client, SingletonClientArgs},
65		SleepInhibitor,
66	},
67};
68
69impl From<AuthProvider> for crate::auth::AuthProvider {
70	fn from(auth_provider: AuthProvider) -> Self {
71		match auth_provider {
72			AuthProvider::Github => crate::auth::AuthProvider::Github,
73			AuthProvider::Microsoft => crate::auth::AuthProvider::Microsoft,
74		}
75	}
76}
77
78fn fulfill_existing_tunnel_args(
79	d: ExistingTunnelArgs,
80	name_arg: &Option<String>,
81) -> Option<dev_tunnels::ExistingTunnel> {
82	let tunnel_name = d.tunnel_name.or_else(|| name_arg.clone());
83
84	match (d.tunnel_id, d.cluster, d.host_token) {
85		(Some(tunnel_id), None, Some(host_token)) => {
86			let i = tunnel_id.find('.')?;
87			Some(dev_tunnels::ExistingTunnel {
88				tunnel_id: tunnel_id[..i].to_string(),
89				cluster: tunnel_id[i + 1..].to_string(),
90				tunnel_name,
91				host_token,
92			})
93		}
94
95		(Some(tunnel_id), Some(cluster), Some(host_token)) => Some(dev_tunnels::ExistingTunnel {
96			tunnel_id,
97			tunnel_name,
98			host_token,
99			cluster,
100		}),
101
102		_ => None,
103	}
104}
105
106struct TunnelServiceContainer {
107	args: CliCore,
108}
109
110impl TunnelServiceContainer {
111	fn new(args: CliCore) -> Self {
112		Self { args }
113	}
114}
115
116#[async_trait]
117impl ServiceContainer for TunnelServiceContainer {
118	async fn run_service(
119		&mut self,
120		log: log::Logger,
121		launcher_paths: LauncherPaths,
122	) -> Result<(), AnyError> {
123		let csa = (&self.args).into();
124		serve_with_csa(
125			launcher_paths,
126			log,
127			TunnelServeArgs {
128				random_name: true, // avoid prompting
129				..Default::default()
130			},
131			csa,
132			TUNNEL_SERVICE_LOCK_NAME,
133		)
134		.await?;
135		Ok(())
136	}
137}
138
139pub async fn command_shell(ctx: CommandContext, args: CommandShellArgs) -> Result<i32, AnyError> {
140	let platform = PreReqChecker::new().verify().await?;
141	let mut shutdown_reqs = vec![ShutdownRequest::CtrlC];
142	if let Some(p) = args.parent_process_id.and_then(|p| Pid::from_str(&p).ok()) {
143		shutdown_reqs.push(ShutdownRequest::ParentProcessKilled(p));
144	}
145
146	let mut params = ServeStreamParams {
147		log: ctx.log,
148		launcher_paths: ctx.paths,
149		platform,
150		requires_auth: args
151			.require_token
152			.map(AuthRequired::VSDAWithToken)
153			.unwrap_or(AuthRequired::VSDA),
154		exit_barrier: ShutdownRequest::create_rx(shutdown_reqs),
155		code_server_args: (&ctx.args).into(),
156	};
157
158	let mut listener: Box<dyn AsyncRWAccepter> = match (args.on_port, args.on_socket) {
159		(_, true) => {
160			let socket = get_socket_name();
161			let listener = listen_socket_rw_stream(&socket)
162				.await
163				.map_err(|e| wrap(e, "error listening on socket"))?;
164
165			params
166				.log
167				.result(format!("Listening on {}", socket.display()));
168
169			Box::new(listener)
170		}
171		(Some(p), _) => {
172			let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), p);
173			let listener = tokio::net::TcpListener::bind(addr)
174				.await
175				.map_err(|e| wrap(e, "error listening on port"))?;
176
177			params
178				.log
179				.result(format!("Listening on {}", listener.local_addr().unwrap()));
180
181			Box::new(listener)
182		}
183		_ => {
184			serve_stream(tokio::io::stdin(), tokio::io::stderr(), params).await;
185			return Ok(0);
186		}
187	};
188
189	let mut servers = FuturesUnordered::new();
190
191	loop {
192		tokio::select! {
193			Some(_) = servers.next() => {},
194			socket = listener.accept_rw() => {
195				match socket {
196					Ok((read, write)) => servers.push(serve_stream(read, write, params.clone())),
197					Err(e) => {
198						error!(params.log, &format!("Error accepting connection: {}", e));
199						return Ok(1);
200					}
201				}
202			},
203			_ = params.exit_barrier.wait() => {
204				// wait for all servers to finish up:
205				while (servers.next().await).is_some() { }
206				return Ok(0);
207			}
208		}
209	}
210}
211
212pub async fn service(
213	ctx: CommandContext,
214	service_args: TunnelServiceSubCommands,
215) -> Result<i32, AnyError> {
216	let manager = create_service_manager(ctx.log.clone(), &ctx.paths);
217	match service_args {
218		TunnelServiceSubCommands::Install(args) => {
219			let auth = Auth::new(&ctx.paths, ctx.log.clone());
220
221			if let Some(name) = &args.name {
222				// ensure the name matches, and tunnel exists
223				dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths)
224					.rename_tunnel(name)
225					.await?;
226			} else {
227				// still ensure they're logged in, otherwise subsequent serving will fail
228				auth.get_credential().await?;
229			}
230
231			// likewise for license consent
232			legal::require_consent(&ctx.paths, args.accept_server_license_terms)?;
233
234			let current_exe = canonical_exe().map_err(|e| wrap(e, "could not get current exe"))?;
235
236			manager
237				.register(
238					current_exe,
239					&[
240						"--verbose",
241						"--cli-data-dir",
242						ctx.paths.root().as_os_str().to_string_lossy().as_ref(),
243						"tunnel",
244						"service",
245						"internal-run",
246					],
247				)
248				.await?;
249			ctx.log.result(format!("Service successfully installed! You can use `{} tunnel service log` to monitor it, and `{} tunnel service uninstall` to remove it.", APPLICATION_NAME, APPLICATION_NAME));
250		}
251		TunnelServiceSubCommands::Uninstall => {
252			manager.unregister().await?;
253		}
254		TunnelServiceSubCommands::Log => {
255			manager.show_logs().await?;
256		}
257		TunnelServiceSubCommands::InternalRun => {
258			manager
259				.run(ctx.paths.clone(), TunnelServiceContainer::new(ctx.args))
260				.await?;
261		}
262	}
263
264	Ok(0)
265}
266
267pub async fn user(ctx: CommandContext, user_args: TunnelUserSubCommands) -> Result<i32, AnyError> {
268	let auth = Auth::new(&ctx.paths, ctx.log.clone());
269	match user_args {
270		TunnelUserSubCommands::Login(login_args) => {
271			auth.login(
272				login_args.provider.map(|p| p.into()),
273				login_args.access_token.to_owned(),
274			)
275			.await?;
276		}
277		TunnelUserSubCommands::Logout => {
278			auth.clear_credentials()?;
279		}
280		TunnelUserSubCommands::Show => {
281			if let Ok(Some(_)) = auth.get_current_credential() {
282				ctx.log.result("logged in");
283			} else {
284				ctx.log.result("not logged in");
285				return Ok(1);
286			}
287		}
288	}
289
290	Ok(0)
291}
292
293/// Remove the tunnel used by this tunnel, if any.
294pub async fn rename(ctx: CommandContext, rename_args: TunnelRenameArgs) -> Result<i32, AnyError> {
295	let auth = Auth::new(&ctx.paths, ctx.log.clone());
296	let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths);
297	dt.rename_tunnel(&rename_args.name).await?;
298	ctx.log.result(format!(
299		"Successfully renamed this tunnel to {}",
300		&rename_args.name
301	));
302
303	Ok(0)
304}
305
306/// Remove the tunnel used by this tunnel, if any.
307pub async fn unregister(ctx: CommandContext) -> Result<i32, AnyError> {
308	let auth = Auth::new(&ctx.paths, ctx.log.clone());
309	let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths);
310	dt.remove_tunnel().await?;
311	Ok(0)
312}
313
314pub async fn restart(ctx: CommandContext) -> Result<i32, AnyError> {
315	do_single_rpc_call::<_, ()>(
316		&ctx.paths.tunnel_lockfile(),
317		ctx.log,
318		protocol::singleton::METHOD_RESTART,
319		protocol::EmptyObject {},
320	)
321	.await
322	.map(|_| 0)
323	.map_err(|e| e.into())
324}
325
326pub async fn kill(ctx: CommandContext) -> Result<i32, AnyError> {
327	do_single_rpc_call::<_, ()>(
328		&ctx.paths.tunnel_lockfile(),
329		ctx.log,
330		protocol::singleton::METHOD_SHUTDOWN,
331		protocol::EmptyObject {},
332	)
333	.await
334	.map(|_| 0)
335	.map_err(|e| e.into())
336}
337
338#[derive(Serialize)]
339pub struct StatusOutput {
340	pub tunnel: Option<protocol::singleton::StatusWithTunnelName>,
341	pub service_installed: bool,
342}
343
344pub async fn status(ctx: CommandContext) -> Result<i32, AnyError> {
345	let tunnel = do_single_rpc_call::<_, protocol::singleton::StatusWithTunnelName>(
346		&ctx.paths.tunnel_lockfile(),
347		ctx.log.clone(),
348		protocol::singleton::METHOD_STATUS,
349		protocol::EmptyObject {},
350	)
351	.await;
352
353	let service_installed = create_service_manager(ctx.log.clone(), &ctx.paths)
354		.is_installed()
355		.await
356		.unwrap_or(false);
357
358	ctx.log.result(
359		serde_json::to_string(&StatusOutput {
360			service_installed,
361			tunnel: match tunnel {
362				Ok(s) => Some(s),
363				Err(CodeError::NoRunningTunnel | CodeError::AsyncPipeFailed(_)) => None,
364				Err(e) => return Err(e.into()),
365			},
366		})
367		.unwrap(),
368	);
369
370	Ok(0)
371}
372
373/// Removes unused servers.
374pub async fn prune(ctx: CommandContext) -> Result<i32, AnyError> {
375	get_all_servers(&ctx.paths)
376		.into_iter()
377		.map(|s| s.server_paths(&ctx.paths))
378		.filter(|s| s.get_running_pid().is_none())
379		.try_for_each(|s| {
380			ctx.log
381				.result(format!("Deleted {}", s.server_dir.display()));
382			s.delete()
383		})
384		.map_err(AnyError::from)?;
385
386	ctx.log.result("Successfully removed all unused servers");
387
388	Ok(0)
389}
390
391/// Starts the gateway server.
392pub async fn serve(ctx: CommandContext, gateway_args: TunnelServeArgs) -> Result<i32, AnyError> {
393	let CommandContext {
394		log, paths, args, ..
395	} = ctx;
396
397	let no_sleep = match gateway_args.no_sleep.then(SleepInhibitor::new) {
398		Some(i) => match i.await {
399			Ok(i) => Some(i),
400			Err(e) => {
401				warning!(log, "Could not inhibit sleep: {}", e);
402				None
403			}
404		},
405		None => None,
406	};
407
408	legal::require_consent(&paths, gateway_args.accept_server_license_terms)?;
409
410	let mut csa = (&args).into();
411	gateway_args.apply_to_server_args(&mut csa);
412	let result = serve_with_csa(paths, log, gateway_args, csa, TUNNEL_CLI_LOCK_NAME).await;
413	drop(no_sleep);
414
415	result
416}
417
418/// Internal command used by port forwarding. It reads requests for forwarded ports
419/// on lines from stdin, as JSON. It uses singleton logic as well (though on
420/// a different tunnel than the main one used for the control server) so that
421/// all forward requests on a single machine go through a single hosted tunnel
422/// process. Without singleton logic, requests could get routed to processes
423/// that aren't forwarding a given port and then fail.
424pub async fn forward(
425	ctx: CommandContext,
426	mut forward_args: TunnelForwardArgs,
427) -> Result<i32, AnyError> {
428	// Spooky: check IS_A_TTY before starting the stdin reader, since IS_A_TTY will
429	// access stdin but a lock will later be held on stdin by the line-reader.
430	if *IS_A_TTY {
431		trace!(ctx.log, "port forwarding is an internal preview feature");
432	}
433
434	// #region stdin reading logic:
435	let (own_ports_tx, own_ports_rx) = watch::channel(vec![]);
436	let ports_process_log = ctx.log.clone();
437	tokio::spawn(async move {
438		let mut lines = BufReader::new(tokio::io::stdin()).lines();
439		while let Ok(Some(line)) = lines.next_line().await {
440			match serde_json::from_str(&line) {
441				Ok(p) => {
442					let _ = own_ports_tx.send(p);
443				}
444				Err(e) => warning!(ports_process_log, "error parsing ports: {}", e),
445			}
446		}
447	});
448
449	// #region singleton acquisition
450	let shutdown = ShutdownRequest::create_rx([ShutdownRequest::CtrlC]);
451	let server = loop {
452		if shutdown.is_open() {
453			return Ok(0);
454		}
455
456		match acquire_singleton(&ctx.paths.forwarding_lockfile()).await {
457			Ok(SingletonConnection::Client(stream)) => {
458				debug!(ctx.log, "starting as client to singleton");
459				let r = local_forwarding::client(local_forwarding::SingletonClientArgs {
460					log: ctx.log.clone(),
461					shutdown: shutdown.clone(),
462					stream,
463					port_requests: own_ports_rx.clone(),
464				})
465				.await;
466				if let Err(e) = r {
467					warning!(ctx.log, "error contacting forwarding singleton: {}", e);
468				}
469			}
470			Ok(SingletonConnection::Singleton(server)) => break server,
471			Err(e) => {
472				warning!(ctx.log, "error access singleton, retrying: {}", e);
473				tokio::time::sleep(Duration::from_secs(2)).await
474			}
475		}
476	};
477
478	// #region singleton handler
479	let auth = Auth::new(&ctx.paths, ctx.log.clone());
480	if let (Some(p), Some(at)) = (
481		forward_args.login.provider.take(),
482		forward_args.login.access_token.take(),
483	) {
484		auth.login(Some(p.into()), Some(at)).await?;
485	}
486
487	let mut tunnels = DevTunnels::new_port_forwarding(&ctx.log, auth, &ctx.paths);
488	let tunnel = tunnels
489		.start_new_launcher_tunnel(None, true, &forward_args.ports)
490		.await?;
491
492	local_forwarding::server(ctx.log, tunnel, server, own_ports_rx, shutdown).await?;
493
494	Ok(0)
495}
496
497fn get_connection_token(tunnel: &ActiveTunnel) -> String {
498	let mut hash = Sha256::new();
499	hash.update(tunnel.id.as_bytes());
500	let result = hash.finalize();
501	b64::URL_SAFE_NO_PAD.encode(result)
502}
503
504async fn serve_with_csa(
505	paths: LauncherPaths,
506	mut log: log::Logger,
507	gateway_args: TunnelServeArgs,
508	mut csa: CodeServerArgs,
509	app_mutex_name: Option<&'static str>,
510) -> Result<i32, AnyError> {
511	let log_broadcast = BroadcastLogSink::new();
512	log = log.tee(log_broadcast.clone());
513	log::install_global_logger(log.clone()); // re-install so that library logs are captured
514
515	debug!(
516		log,
517		"Starting tunnel with `{} {}`",
518		APPLICATION_NAME,
519		std::env::args().collect::<Vec<_>>().join(" ")
520	);
521
522	// Intentionally read before starting the server. If the server updated and
523	// respawn is requested, the old binary will get renamed, and then
524	// current_exe will point to the wrong path.
525	let current_exe = std::env::current_exe().unwrap();
526
527	let mut vec = vec![
528		ShutdownRequest::CtrlC,
529		ShutdownRequest::ExeUninstalled(current_exe.to_owned()),
530	];
531	if let Some(p) = gateway_args
532		.parent_process_id
533		.and_then(|p| Pid::from_str(&p).ok())
534	{
535		vec.push(ShutdownRequest::ParentProcessKilled(p));
536	}
537	let mut shutdown = ShutdownRequest::create_rx(vec);
538
539	let server = loop {
540		if shutdown.is_open() {
541			return Ok(0);
542		}
543
544		match acquire_singleton(&paths.tunnel_lockfile()).await {
545			Ok(SingletonConnection::Client(stream)) => {
546				debug!(log, "starting as client to singleton");
547				let should_exit = start_singleton_client(SingletonClientArgs {
548					log: log.clone(),
549					shutdown: shutdown.clone(),
550					stream,
551				})
552				.await;
553				if should_exit {
554					return Ok(0);
555				}
556			}
557			Ok(SingletonConnection::Singleton(server)) => break server,
558			Err(e) => {
559				warning!(log, "error access singleton, retrying: {}", e);
560				tokio::time::sleep(Duration::from_secs(2)).await
561			}
562		}
563	};
564
565	debug!(log, "starting as new singleton");
566
567	let mut server =
568		make_singleton_server(log_broadcast.clone(), log.clone(), server, shutdown.clone());
569	let platform = spanf!(log, log.span("prereq"), PreReqChecker::new().verify())?;
570	let _lock = app_mutex_name.map(AppMutex::new);
571
572	let auth = Auth::new(&paths, log.clone());
573	let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&log, auth, &paths);
574	loop {
575		let tunnel = if let Some(t) =
576			fulfill_existing_tunnel_args(gateway_args.tunnel.clone(), &gateway_args.name)
577		{
578			dt.start_existing_tunnel(t).await
579		} else {
580			tokio::select! {
581				t = dt.start_new_launcher_tunnel(gateway_args.name.as_deref(), gateway_args.random_name, &[CONTROL_PORT]) => t,
582				_ = shutdown.wait() => return Ok(1),
583			}
584		}?;
585
586		csa.connection_token = Some(get_connection_token(&tunnel));
587
588		let mut r = start_singleton_server(SingletonServerArgs {
589			log: log.clone(),
590			tunnel,
591			paths: &paths,
592			code_server_args: &csa,
593			platform,
594			log_broadcast: &log_broadcast,
595			shutdown: shutdown.clone(),
596			server: &mut server,
597		})
598		.await?;
599		r.tunnel.close().await.ok();
600
601		match r.next {
602			Next::Respawn => {
603				warning!(log, "respawn requested, starting new server");
604				// reuse current args, but specify no-forward since tunnels will
605				// already be running in this process, and we cannot do a login
606				let args = std::env::args().skip(1).collect::<Vec<String>>();
607				let exit = new_std_command(current_exe)
608					.args(args)
609					.spawn()
610					.map_err(|e| wrap(e, "error respawning after update"))?
611					.wait()
612					.map_err(|e| wrap(e, "error waiting for child"))?;
613
614				return Ok(exit.code().unwrap_or(1));
615			}
616			Next::Exit => {
617				debug!(log, "Tunnel shut down");
618				return Ok(0);
619			}
620			Next::Restart => continue,
621		}
622	}
623}