cli/tunnels/
control_server.rs

1/*---------------------------------------------------------------------------------------------
2 *  Copyright (c) Microsoft Corporation. All rights reserved.
3 *  Licensed under the MIT License. See License.txt in the project root for license information.
4 *--------------------------------------------------------------------------------------------*/
5use crate::async_pipe::get_socket_rw_stream;
6use crate::constants::{CONTROL_PORT, PRODUCT_NAME_LONG};
7use crate::log;
8use crate::msgpack_rpc::{new_msgpack_rpc, start_msgpack_rpc, MsgPackCodec, MsgPackSerializer};
9use crate::options::Quality;
10use crate::rpc::{MaybeSync, RpcBuilder, RpcCaller, RpcDispatcher};
11use crate::self_update::SelfUpdate;
12use crate::state::LauncherPaths;
13use crate::tunnels::protocol::{HttpRequestParams, METHOD_CHALLENGE_ISSUE};
14use crate::tunnels::socket_signal::CloseReason;
15use crate::update_service::{Platform, Release, TargetKind, UpdateService};
16use crate::util::command::new_tokio_command;
17use crate::util::errors::{
18	wrap, AnyError, CodeError, MismatchedLaunchModeError, NoAttachedServerError,
19};
20use crate::util::http::{
21	DelegatedHttpRequest, DelegatedSimpleHttp, FallbackSimpleHttp, ReqwestSimpleHttp,
22};
23use crate::util::io::SilentCopyProgress;
24use crate::util::is_integrated_cli;
25use crate::util::machine::kill_pid;
26use crate::util::os::os_release;
27use crate::util::sync::{new_barrier, Barrier, BarrierOpener};
28
29use futures::stream::FuturesUnordered;
30use futures::FutureExt;
31use opentelemetry::trace::SpanKind;
32use opentelemetry::KeyValue;
33use std::collections::HashMap;
34use std::path::PathBuf;
35use std::process::Stdio;
36use tokio::net::TcpStream;
37use tokio::pin;
38use tokio::process::{ChildStderr, ChildStdin};
39use tokio_util::codec::Decoder;
40
41use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
42use std::sync::Arc;
43use std::time::Instant;
44use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, DuplexStream};
45use tokio::sync::{mpsc, Mutex};
46
47use super::challenge::{create_challenge, sign_challenge, verify_challenge};
48use super::code_server::{
49	download_cli_into_cache, AnyCodeServer, CodeServerArgs, ServerBuilder, ServerParamsRaw,
50	SocketCodeServer,
51};
52use super::dev_tunnels::ActiveTunnel;
53use super::paths::prune_stopped_servers;
54use super::port_forwarder::{PortForwarding, PortForwardingProcessor};
55use super::protocol::{
56	AcquireCliParams, CallServerHttpParams, CallServerHttpResult, ChallengeIssueParams,
57	ChallengeIssueResponse, ChallengeVerifyParams, ClientRequestMethod, EmptyObject, ForwardParams,
58	ForwardResult, FsReadDirEntry, FsReadDirResponse, FsRenameRequest, FsSinglePathRequest,
59	FsStatResponse, GetEnvResponse, GetHostnameResponse, HttpBodyParams, HttpHeadersParams,
60	NetConnectRequest, ServeParams, ServerLog, ServerMessageParams, SpawnParams, SpawnResult,
61	SysKillRequest, SysKillResponse, ToClientRequest, UnforwardParams, UpdateParams, UpdateResult,
62	VersionResponse, METHOD_CHALLENGE_VERIFY,
63};
64use super::server_bridge::ServerBridge;
65use super::server_multiplexer::ServerMultiplexer;
66use super::shutdown_signal::ShutdownSignal;
67use super::socket_signal::{
68	ClientMessageDecoder, ServerMessageDestination, ServerMessageSink, SocketSignal,
69};
70
71type HttpRequestsMap = Arc<std::sync::Mutex<HashMap<u32, DelegatedHttpRequest>>>;
72type CodeServerCell = Arc<Mutex<Option<SocketCodeServer>>>;
73
74struct HandlerContext {
75	/// Log handle for the server
76	log: log::Logger,
77	/// Whether the server update during the handler session.
78	did_update: Arc<AtomicBool>,
79	/// Whether authentication is still required on the socket.
80	auth_state: Arc<std::sync::Mutex<AuthState>>,
81	/// A loopback channel to talk to the socket server task.
82	socket_tx: mpsc::Sender<SocketSignal>,
83	/// Configured launcher paths.
84	launcher_paths: LauncherPaths,
85	/// Connected VS Code Server
86	code_server: CodeServerCell,
87	/// Potentially many "websocket" connections to client
88	server_bridges: ServerMultiplexer,
89	// the cli arguments used to start the code server
90	code_server_args: CodeServerArgs,
91	/// port forwarding functionality
92	port_forwarding: Option<PortForwarding>,
93	/// install platform for the VS Code server
94	platform: Platform,
95	/// http client to make download/update requests
96	http: Arc<FallbackSimpleHttp>,
97	/// requests being served by the client
98	http_requests: HttpRequestsMap,
99}
100
101/// Handler auth state.
102enum AuthState {
103	/// Auth is required, we're waiting for the client to send its challenge optionally bearing a token.
104	WaitingForChallenge(Option<String>),
105	/// A challenge has been issued. Waiting for a verification.
106	ChallengeIssued(String),
107	/// Auth is no longer required.
108	Authenticated,
109}
110
111static MESSAGE_ID_COUNTER: AtomicU32 = AtomicU32::new(0);
112
113// Gets a next incrementing number that can be used in logs
114pub fn next_message_id() -> u32 {
115	MESSAGE_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
116}
117
118impl HandlerContext {
119	async fn dispose(&self) {
120		self.server_bridges.dispose().await;
121		info!(self.log, "Disposed of connection to running server.");
122	}
123}
124
125enum ServerSignal {
126	/// Signalled when the server has been updated and we want to respawn.
127	/// We'd generally need to stop and then restart the launcher, but the
128	/// program might be managed by a supervisor like systemd. Instead, we
129	/// will stop the TCP listener and spawn the launcher again as a subprocess
130	/// with the same arguments we used.
131	Respawn,
132}
133
134pub enum Next {
135	/// Whether the server should be respawned in a new binary (see ServerSignal.Respawn).
136	Respawn,
137	/// Whether the tunnel should be restarted
138	Restart,
139	/// Whether the process should exit
140	Exit,
141}
142
143pub struct ServerTermination {
144	pub next: Next,
145	pub tunnel: ActiveTunnel,
146}
147
148async fn preload_extensions(
149	log: &log::Logger,
150	platform: Platform,
151	mut args: CodeServerArgs,
152	launcher_paths: LauncherPaths,
153) -> Result<(), AnyError> {
154	args.start_server = false;
155
156	let params_raw = ServerParamsRaw {
157		commit_id: None,
158		quality: Quality::Stable,
159		code_server_args: args.clone(),
160		headless: true,
161		platform,
162	};
163
164	// cannot use delegated HTTP here since there's no remote connection yet
165	let http = Arc::new(ReqwestSimpleHttp::new());
166	let resolved = params_raw.resolve(log, http.clone()).await?;
167	let sb = ServerBuilder::new(log, &resolved, &launcher_paths, http.clone());
168
169	sb.setup().await?;
170	sb.install_extensions().await
171}
172
173// Runs the launcher server. Exits on a ctrl+c or when requested by a user.
174// Note that client connections may not be closed when this returns; use
175// `close_all_clients()` on the ServerTermination to make this happen.
176pub async fn serve(
177	log: &log::Logger,
178	mut tunnel: ActiveTunnel,
179	launcher_paths: &LauncherPaths,
180	code_server_args: &CodeServerArgs,
181	platform: Platform,
182	mut shutdown_rx: Barrier<ShutdownSignal>,
183) -> Result<ServerTermination, AnyError> {
184	let mut port = tunnel.add_port_direct(CONTROL_PORT).await?;
185	let mut forwarding = PortForwardingProcessor::new();
186	let (tx, mut rx) = mpsc::channel::<ServerSignal>(4);
187	let (exit_barrier, signal_exit) = new_barrier();
188
189	if !code_server_args.install_extensions.is_empty() {
190		info!(
191			log,
192			"Preloading extensions using stable server: {:?}", code_server_args.install_extensions
193		);
194		let log = log.clone();
195		let code_server_args = code_server_args.clone();
196		let launcher_paths = launcher_paths.clone();
197		// This is run async to the primary tunnel setup to be speedy.
198		tokio::spawn(async move {
199			if let Err(e) =
200				preload_extensions(&log, platform, code_server_args, launcher_paths).await
201			{
202				warning!(log, "Failed to preload extensions: {:?}", e);
203			} else {
204				info!(log, "Extension install complete");
205			}
206		});
207	}
208
209	loop {
210		tokio::select! {
211			Ok(reason) = shutdown_rx.wait() => {
212				info!(log, "Shutting down: {}", reason);
213				drop(signal_exit);
214				return Ok(ServerTermination {
215					next: match reason {
216						ShutdownSignal::RpcRestartRequested => Next::Restart,
217						_ => Next::Exit,
218					},
219					tunnel,
220				});
221			},
222			c = rx.recv() => {
223				if let Some(ServerSignal::Respawn) = c {
224					drop(signal_exit);
225					return Ok(ServerTermination {
226						next: Next::Respawn,
227						tunnel,
228					});
229				}
230			},
231			Some(w) = forwarding.recv() => {
232				forwarding.process(w, &mut tunnel).await;
233			},
234			l = port.recv() => {
235				let socket = match l {
236					Some(p) => p,
237					None => {
238						warning!(log, "ssh tunnel disposed, tearing down");
239						return Ok(ServerTermination {
240							next: Next::Restart,
241							tunnel,
242						});
243					}
244				};
245
246				let own_log = log.prefixed(&log::new_rpc_prefix());
247				let own_tx = tx.clone();
248				let own_paths = launcher_paths.clone();
249				let own_exit = exit_barrier.clone();
250				let own_code_server_args = code_server_args.clone();
251				let own_forwarding = forwarding.handle();
252
253				tokio::spawn(async move {
254					use opentelemetry::trace::{FutureExt, TraceContextExt};
255
256					let span = own_log.span("server.socket").with_kind(SpanKind::Consumer).start(own_log.tracer());
257					let cx = opentelemetry::Context::current_with_span(span);
258					let serve_at = Instant::now();
259
260					debug!(own_log, "Serving new connection");
261
262					let (writehalf, readhalf) = socket.into_split();
263					let stats = process_socket(readhalf, writehalf, own_tx, Some(own_forwarding), ServeStreamParams {
264						log: own_log,
265						launcher_paths: own_paths,
266						code_server_args: own_code_server_args,
267						platform,
268						exit_barrier: own_exit,
269						requires_auth: AuthRequired::None,
270					}).with_context(cx.clone()).await;
271
272					cx.span().add_event(
273						"socket.bandwidth",
274						vec![
275							KeyValue::new("tx", stats.tx as f64),
276							KeyValue::new("rx", stats.rx as f64),
277							KeyValue::new("duration_ms", serve_at.elapsed().as_millis() as f64),
278						],
279					);
280					cx.span().end();
281				});
282			}
283		}
284	}
285}
286
287#[derive(Clone)]
288pub enum AuthRequired {
289	None,
290	VSDA,
291	VSDAWithToken(String),
292}
293
294#[derive(Clone)]
295pub struct ServeStreamParams {
296	pub log: log::Logger,
297	pub launcher_paths: LauncherPaths,
298	pub code_server_args: CodeServerArgs,
299	pub platform: Platform,
300	pub requires_auth: AuthRequired,
301	pub exit_barrier: Barrier<ShutdownSignal>,
302}
303
304pub async fn serve_stream(
305	readhalf: impl AsyncRead + Send + Unpin + 'static,
306	writehalf: impl AsyncWrite + Unpin,
307	params: ServeStreamParams,
308) -> SocketStats {
309	// Currently the only server signal is respawn, that doesn't have much meaning
310	// when serving a stream, so make an ignored channel.
311	let (server_rx, server_tx) = mpsc::channel(1);
312	drop(server_tx);
313
314	process_socket(readhalf, writehalf, server_rx, None, params).await
315}
316
317pub struct SocketStats {
318	rx: usize,
319	tx: usize,
320}
321
322#[allow(clippy::too_many_arguments)]
323fn make_socket_rpc(
324	log: log::Logger,
325	socket_tx: mpsc::Sender<SocketSignal>,
326	http_delegated: DelegatedSimpleHttp,
327	launcher_paths: LauncherPaths,
328	code_server_args: CodeServerArgs,
329	port_forwarding: Option<PortForwarding>,
330	requires_auth: AuthRequired,
331	platform: Platform,
332	http_requests: HttpRequestsMap,
333) -> RpcDispatcher<MsgPackSerializer, HandlerContext> {
334	let server_bridges = ServerMultiplexer::new();
335	let mut rpc = RpcBuilder::new(MsgPackSerializer {}).methods(HandlerContext {
336		did_update: Arc::new(AtomicBool::new(false)),
337		auth_state: Arc::new(std::sync::Mutex::new(match requires_auth {
338			AuthRequired::VSDAWithToken(t) => AuthState::WaitingForChallenge(Some(t)),
339			AuthRequired::VSDA => AuthState::WaitingForChallenge(None),
340			AuthRequired::None => AuthState::Authenticated,
341		})),
342		socket_tx,
343		log: log.clone(),
344		launcher_paths,
345		code_server_args,
346		code_server: Arc::new(Mutex::new(None)),
347		server_bridges,
348		port_forwarding,
349		platform,
350		http: Arc::new(FallbackSimpleHttp::new(
351			ReqwestSimpleHttp::new(),
352			http_delegated,
353		)),
354		http_requests,
355	});
356
357	rpc.register_sync("ping", |_: EmptyObject, _| Ok(EmptyObject {}));
358	rpc.register_sync("gethostname", |_: EmptyObject, _| handle_get_hostname());
359	rpc.register_sync("sys_kill", |p: SysKillRequest, c| {
360		ensure_auth(&c.auth_state)?;
361		handle_sys_kill(p.pid)
362	});
363	rpc.register_sync("fs_stat", |p: FsSinglePathRequest, c| {
364		ensure_auth(&c.auth_state)?;
365		handle_stat(p.path)
366	});
367	rpc.register_duplex(
368		"fs_read",
369		1,
370		move |mut streams, p: FsSinglePathRequest, c| async move {
371			ensure_auth(&c.auth_state)?;
372			handle_fs_read(streams.remove(0), p.path).await
373		},
374	);
375	rpc.register_duplex(
376		"fs_write",
377		1,
378		move |mut streams, p: FsSinglePathRequest, c| async move {
379			ensure_auth(&c.auth_state)?;
380			handle_fs_write(streams.remove(0), p.path).await
381		},
382	);
383	rpc.register_duplex(
384		"fs_connect",
385		1,
386		move |mut streams, p: FsSinglePathRequest, c| async move {
387			ensure_auth(&c.auth_state)?;
388			handle_fs_connect(streams.remove(0), p.path).await
389		},
390	);
391	rpc.register_duplex(
392		"net_connect",
393		1,
394		move |mut streams, n: NetConnectRequest, c| async move {
395			ensure_auth(&c.auth_state)?;
396			handle_net_connect(streams.remove(0), n).await
397		},
398	);
399	rpc.register_async("fs_rm", move |p: FsSinglePathRequest, c| async move {
400		ensure_auth(&c.auth_state)?;
401		handle_fs_remove(p.path).await
402	});
403	rpc.register_sync("fs_mkdirp", |p: FsSinglePathRequest, c| {
404		ensure_auth(&c.auth_state)?;
405		handle_fs_mkdirp(p.path)
406	});
407	rpc.register_sync("fs_rename", |p: FsRenameRequest, c| {
408		ensure_auth(&c.auth_state)?;
409		handle_fs_rename(p.from_path, p.to_path)
410	});
411	rpc.register_sync("fs_readdir", |p: FsSinglePathRequest, c| {
412		ensure_auth(&c.auth_state)?;
413		handle_fs_readdir(p.path)
414	});
415	rpc.register_sync("get_env", |_: EmptyObject, c| {
416		ensure_auth(&c.auth_state)?;
417		handle_get_env()
418	});
419	rpc.register_sync(METHOD_CHALLENGE_ISSUE, |p: ChallengeIssueParams, c| {
420		handle_challenge_issue(p, &c.auth_state)
421	});
422	rpc.register_sync(METHOD_CHALLENGE_VERIFY, |p: ChallengeVerifyParams, c| {
423		handle_challenge_verify(p.response, &c.auth_state)
424	});
425	rpc.register_async("serve", move |params: ServeParams, c| async move {
426		ensure_auth(&c.auth_state)?;
427		handle_serve(c, params).await
428	});
429	rpc.register_async("update", |p: UpdateParams, c| async move {
430		handle_update(&c.http, &c.log, &c.did_update, &p).await
431	});
432	rpc.register_sync("servermsg", |m: ServerMessageParams, c| {
433		if let Err(e) = handle_server_message(&c.log, &c.server_bridges, m) {
434			warning!(c.log, "error handling call: {:?}", e);
435		}
436		Ok(EmptyObject {})
437	});
438	rpc.register_sync("prune", |_: EmptyObject, c| handle_prune(&c.launcher_paths));
439	rpc.register_async("callserverhttp", |p: CallServerHttpParams, c| async move {
440		let code_server = c.code_server.lock().await.clone();
441		handle_call_server_http(code_server, p).await
442	});
443	rpc.register_async("forward", |p: ForwardParams, c| async move {
444		ensure_auth(&c.auth_state)?;
445		handle_forward(&c.log, &c.port_forwarding, p).await
446	});
447	rpc.register_async("unforward", |p: UnforwardParams, c| async move {
448		ensure_auth(&c.auth_state)?;
449		handle_unforward(&c.log, &c.port_forwarding, p).await
450	});
451	rpc.register_async("acquire_cli", |p: AcquireCliParams, c| async move {
452		ensure_auth(&c.auth_state)?;
453		handle_acquire_cli(&c.launcher_paths, &c.http, &c.log, p).await
454	});
455	rpc.register_duplex("spawn", 3, |mut streams, p: SpawnParams, c| async move {
456		ensure_auth(&c.auth_state)?;
457		handle_spawn(
458			&c.log,
459			p,
460			Some(streams.remove(0)),
461			Some(streams.remove(0)),
462			Some(streams.remove(0)),
463		)
464		.await
465	});
466	rpc.register_duplex(
467		"spawn_cli",
468		3,
469		|mut streams, p: SpawnParams, c| async move {
470			ensure_auth(&c.auth_state)?;
471			handle_spawn_cli(
472				&c.log,
473				p,
474				streams.remove(0),
475				streams.remove(0),
476				streams.remove(0),
477			)
478			.await
479		},
480	);
481	rpc.register_sync("httpheaders", |p: HttpHeadersParams, c| {
482		if let Some(req) = c.http_requests.lock().unwrap().get(&p.req_id) {
483			trace!(c.log, "got {} response for req {}", p.status_code, p.req_id);
484			req.initial_response(p.status_code, p.headers);
485		} else {
486			warning!(c.log, "got response for unknown req {}", p.req_id);
487		}
488		Ok(EmptyObject {})
489	});
490	rpc.register_sync("httpbody", move |p: HttpBodyParams, c| {
491		let mut reqs = c.http_requests.lock().unwrap();
492		if let Some(req) = reqs.get(&p.req_id) {
493			if !p.segment.is_empty() {
494				req.body(p.segment);
495			}
496			if p.complete {
497				trace!(c.log, "delegated request {} completed", p.req_id);
498				reqs.remove(&p.req_id);
499			}
500		}
501		Ok(EmptyObject {})
502	});
503	rpc.register_sync(
504		"version",
505		|_: EmptyObject, _| Ok(VersionResponse::default()),
506	);
507
508	rpc.build(log)
509}
510
511fn ensure_auth(is_authed: &Arc<std::sync::Mutex<AuthState>>) -> Result<(), AnyError> {
512	if let AuthState::Authenticated = &*is_authed.lock().unwrap() {
513		Ok(())
514	} else {
515		Err(CodeError::ServerAuthRequired.into())
516	}
517}
518
519#[allow(clippy::too_many_arguments)] // necessary here
520async fn process_socket(
521	readhalf: impl AsyncRead + Send + Unpin + 'static,
522	mut writehalf: impl AsyncWrite + Unpin,
523	server_tx: mpsc::Sender<ServerSignal>,
524	port_forwarding: Option<PortForwarding>,
525	params: ServeStreamParams,
526) -> SocketStats {
527	let ServeStreamParams {
528		mut exit_barrier,
529		log,
530		launcher_paths,
531		code_server_args,
532		platform,
533		requires_auth,
534	} = params;
535
536	let (http_delegated, mut http_rx) = DelegatedSimpleHttp::new(log.clone());
537	let (socket_tx, mut socket_rx) = mpsc::channel(4);
538	let rx_counter = Arc::new(AtomicUsize::new(0));
539	let http_requests = Arc::new(std::sync::Mutex::new(HashMap::new()));
540
541	let already_authed = matches!(requires_auth, AuthRequired::None);
542	let rpc = make_socket_rpc(
543		log.clone(),
544		socket_tx.clone(),
545		http_delegated,
546		launcher_paths,
547		code_server_args,
548		port_forwarding,
549		requires_auth,
550		platform,
551		http_requests.clone(),
552	);
553
554	{
555		let log = log.clone();
556		let rx_counter = rx_counter.clone();
557		let socket_tx = socket_tx.clone();
558		let exit_barrier = exit_barrier.clone();
559		tokio::spawn(async move {
560			if already_authed {
561				send_version(&socket_tx).await;
562			}
563
564			if let Err(e) =
565				handle_socket_read(&log, readhalf, exit_barrier, &socket_tx, rx_counter, &rpc).await
566			{
567				debug!(log, "closing socket reader: {}", e);
568				socket_tx
569					.send(SocketSignal::CloseWith(CloseReason(format!("{}", e))))
570					.await
571					.ok();
572			}
573
574			let ctx = rpc.context();
575
576			// The connection is now closed, asked to respawn if needed
577			if ctx.did_update.load(Ordering::SeqCst) {
578				server_tx.send(ServerSignal::Respawn).await.ok();
579			}
580
581			ctx.dispose().await;
582
583			let _ = socket_tx
584				.send(SocketSignal::CloseWith(CloseReason("eof".to_string())))
585				.await;
586		});
587	}
588
589	let mut tx_counter = 0;
590
591	loop {
592		tokio::select! {
593			_ = exit_barrier.wait() => {
594				writehalf.shutdown().await.ok();
595				break;
596			},
597			Some(r) = http_rx.recv() => {
598				let id = next_message_id();
599				let serialized = rmp_serde::to_vec_named(&ToClientRequest {
600					id: None,
601					params: ClientRequestMethod::makehttpreq(HttpRequestParams {
602						url: &r.url,
603						method: r.method,
604						req_id: id,
605					}),
606				})
607				.unwrap();
608
609				http_requests.lock().unwrap().insert(id, r);
610
611				tx_counter += serialized.len();
612				if let Err(e) = writehalf.write_all(&serialized).await {
613					debug!(log, "Closing connection: {}", e);
614					break;
615				}
616			}
617			recv = socket_rx.recv() => match recv {
618				None => break,
619				Some(message) => match message {
620					SocketSignal::Send(bytes) => {
621						tx_counter += bytes.len();
622						if let Err(e) = writehalf.write_all(&bytes).await {
623							debug!(log, "Closing connection: {}", e);
624							break;
625						}
626					}
627					SocketSignal::CloseWith(reason) => {
628						debug!(log, "Closing connection: {}", reason.0);
629						break;
630					}
631				}
632			}
633		}
634	}
635
636	SocketStats {
637		tx: tx_counter,
638		rx: rx_counter.load(Ordering::Acquire),
639	}
640}
641
642async fn send_version(tx: &mpsc::Sender<SocketSignal>) {
643	tx.send(SocketSignal::from_message(&ToClientRequest {
644		id: None,
645		params: ClientRequestMethod::version(VersionResponse::default()),
646	}))
647	.await
648	.ok();
649}
650async fn handle_socket_read(
651	_log: &log::Logger,
652	readhalf: impl AsyncRead + Unpin,
653	mut closer: Barrier<ShutdownSignal>,
654	socket_tx: &mpsc::Sender<SocketSignal>,
655	rx_counter: Arc<AtomicUsize>,
656	rpc: &RpcDispatcher<MsgPackSerializer, HandlerContext>,
657) -> Result<(), std::io::Error> {
658	let mut readhalf = BufReader::new(readhalf);
659	let mut decoder = MsgPackCodec::new();
660	let mut decoder_buf = bytes::BytesMut::new();
661
662	loop {
663		let read_len = tokio::select! {
664			r = readhalf.read_buf(&mut decoder_buf) => r,
665			_ = closer.wait() => Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "eof")),
666		}?;
667
668		if read_len == 0 {
669			return Ok(());
670		}
671
672		rx_counter.fetch_add(read_len, Ordering::Relaxed);
673
674		while let Some(frame) = decoder.decode(&mut decoder_buf)? {
675			match rpc.dispatch_with_partial(&frame.vec, frame.obj) {
676				MaybeSync::Sync(Some(v)) => {
677					if socket_tx.send(SocketSignal::Send(v)).await.is_err() {
678						return Ok(());
679					}
680				}
681				MaybeSync::Sync(None) => continue,
682				MaybeSync::Future(fut) => {
683					let socket_tx = socket_tx.clone();
684					tokio::spawn(async move {
685						if let Some(v) = fut.await {
686							socket_tx.send(SocketSignal::Send(v)).await.ok();
687						}
688					});
689				}
690				MaybeSync::Stream((stream, fut)) => {
691					if let Some(stream) = stream {
692						rpc.register_stream(socket_tx.clone(), stream).await;
693					}
694					let socket_tx = socket_tx.clone();
695					tokio::spawn(async move {
696						if let Some(v) = fut.await {
697							socket_tx.send(SocketSignal::Send(v)).await.ok();
698						}
699					});
700				}
701			}
702		}
703	}
704}
705
706#[derive(Clone)]
707struct ServerOutputSink {
708	tx: mpsc::Sender<SocketSignal>,
709}
710
711impl log::LogSink for ServerOutputSink {
712	fn write_log(&self, level: log::Level, _prefix: &str, message: &str) {
713		let s = SocketSignal::from_message(&ToClientRequest {
714			id: None,
715			params: ClientRequestMethod::serverlog(ServerLog {
716				line: message,
717				level: level.to_u8(),
718			}),
719		});
720
721		self.tx.try_send(s).ok();
722	}
723
724	fn write_result(&self, _message: &str) {}
725}
726
727async fn handle_serve(
728	c: Arc<HandlerContext>,
729	params: ServeParams,
730) -> Result<EmptyObject, AnyError> {
731	// fill params.extensions into code_server_args.install_extensions
732	let mut csa = c.code_server_args.clone();
733	csa.connection_token = params.connection_token.or(csa.connection_token);
734	csa.install_extensions.extend(params.extensions.into_iter());
735
736	let params_raw = ServerParamsRaw {
737		commit_id: params.commit_id,
738		quality: params.quality,
739		code_server_args: csa,
740		headless: true,
741		platform: c.platform,
742	};
743
744	let resolved = if params.use_local_download {
745		params_raw
746			.resolve(&c.log, Arc::new(c.http.delegated()))
747			.await
748	} else {
749		params_raw.resolve(&c.log, c.http.clone()).await
750	}?;
751
752	let mut server_ref = c.code_server.lock().await;
753	let server = match &*server_ref {
754		Some(o) => o.clone(),
755		None => {
756			let install_log = c.log.tee(ServerOutputSink {
757				tx: c.socket_tx.clone(),
758			});
759
760			macro_rules! do_setup {
761				($sb:expr) => {
762					match $sb.get_running().await? {
763						Some(AnyCodeServer::Socket(s)) => s,
764						Some(_) => return Err(AnyError::from(MismatchedLaunchModeError())),
765						None => {
766							$sb.setup().await?;
767							$sb.listen_on_default_socket().await?
768						}
769					}
770				};
771			}
772
773			let server = if params.use_local_download {
774				let sb = ServerBuilder::new(
775					&install_log,
776					&resolved,
777					&c.launcher_paths,
778					Arc::new(c.http.delegated()),
779				);
780				do_setup!(sb)
781			} else {
782				let sb =
783					ServerBuilder::new(&install_log, &resolved, &c.launcher_paths, c.http.clone());
784				do_setup!(sb)
785			};
786
787			server_ref.replace(server.clone());
788			server
789		}
790	};
791
792	attach_server_bridge(
793		&c.log,
794		server,
795		c.socket_tx.clone(),
796		c.server_bridges.clone(),
797		params.socket_id,
798		params.compress,
799	)
800	.await?;
801	Ok(EmptyObject {})
802}
803
804async fn attach_server_bridge(
805	log: &log::Logger,
806	code_server: SocketCodeServer,
807	socket_tx: mpsc::Sender<SocketSignal>,
808	multiplexer: ServerMultiplexer,
809	socket_id: u16,
810	compress: bool,
811) -> Result<u16, AnyError> {
812	let (server_messages, decoder) = if compress {
813		(
814			ServerMessageSink::new_compressed(
815				multiplexer.clone(),
816				socket_id,
817				ServerMessageDestination::Channel(socket_tx),
818			),
819			ClientMessageDecoder::new_compressed(),
820		)
821	} else {
822		(
823			ServerMessageSink::new_plain(
824				multiplexer.clone(),
825				socket_id,
826				ServerMessageDestination::Channel(socket_tx),
827			),
828			ClientMessageDecoder::new_plain(),
829		)
830	};
831
832	let attached_fut = ServerBridge::new(&code_server.socket, server_messages, decoder).await;
833	match attached_fut {
834		Ok(a) => {
835			multiplexer.register(socket_id, a);
836			trace!(log, "Attached to server");
837			Ok(socket_id)
838		}
839		Err(e) => Err(e),
840	}
841}
842
843/// Handle an incoming server message. This is synchronous and uses a 'write loop'
844/// to ensure message order is preserved exactly, which is necessary for compression.
845fn handle_server_message(
846	log: &log::Logger,
847	multiplexer: &ServerMultiplexer,
848	params: ServerMessageParams,
849) -> Result<EmptyObject, AnyError> {
850	if multiplexer.write_message(log, params.i, params.body) {
851		Ok(EmptyObject {})
852	} else {
853		Err(AnyError::from(NoAttachedServerError()))
854	}
855}
856
857fn handle_prune(paths: &LauncherPaths) -> Result<Vec<String>, AnyError> {
858	prune_stopped_servers(paths).map(|v| {
859		v.iter()
860			.map(|p| p.server_dir.display().to_string())
861			.collect()
862	})
863}
864
865async fn handle_update(
866	http: &Arc<FallbackSimpleHttp>,
867	log: &log::Logger,
868	did_update: &AtomicBool,
869	params: &UpdateParams,
870) -> Result<UpdateResult, AnyError> {
871	if matches!(is_integrated_cli(), Ok(true)) || did_update.load(Ordering::SeqCst) {
872		return Ok(UpdateResult {
873			up_to_date: true,
874			did_update: false,
875		});
876	}
877
878	let update_service = UpdateService::new(log.clone(), http.clone());
879	let updater = SelfUpdate::new(&update_service)?;
880	let latest_release = updater.get_current_release().await?;
881	let up_to_date = updater.is_up_to_date_with(&latest_release);
882
883	let _ = updater.cleanup_old_update();
884
885	if !params.do_update || up_to_date {
886		return Ok(UpdateResult {
887			up_to_date,
888			did_update: false,
889		});
890	}
891
892	if did_update
893		.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
894		.is_err()
895	{
896		return Ok(UpdateResult {
897			up_to_date: true,
898			did_update: true, // well, another thread did, but same difference...
899		});
900	}
901
902	info!(log, "Updating CLI to {}", latest_release);
903
904	updater
905		.do_update(&latest_release, SilentCopyProgress())
906		.await?;
907
908	Ok(UpdateResult {
909		up_to_date: true,
910		did_update: true,
911	})
912}
913
914fn handle_get_hostname() -> Result<GetHostnameResponse, AnyError> {
915	Ok(GetHostnameResponse {
916		value: gethostname::gethostname().to_string_lossy().into_owned(),
917	})
918}
919
920fn handle_stat(path: String) -> Result<FsStatResponse, AnyError> {
921	Ok(std::fs::metadata(path)
922		.map(|m| FsStatResponse {
923			exists: true,
924			size: Some(m.len()),
925			kind: Some(m.file_type().into()),
926		})
927		.unwrap_or_default())
928}
929
930async fn handle_fs_read(mut out: DuplexStream, path: String) -> Result<EmptyObject, AnyError> {
931	let mut f = tokio::fs::File::open(path)
932		.await
933		.map_err(|e| wrap(e, "file not found"))?;
934
935	tokio::io::copy(&mut f, &mut out)
936		.await
937		.map_err(|e| wrap(e, "error reading file"))?;
938
939	Ok(EmptyObject {})
940}
941
942async fn handle_fs_write(mut input: DuplexStream, path: String) -> Result<EmptyObject, AnyError> {
943	let mut f = tokio::fs::File::create(path)
944		.await
945		.map_err(|e| wrap(e, "file not found"))?;
946
947	tokio::io::copy(&mut input, &mut f)
948		.await
949		.map_err(|e| wrap(e, "error writing file"))?;
950
951	Ok(EmptyObject {})
952}
953
954async fn handle_net_connect(
955	mut stream: DuplexStream,
956	req: NetConnectRequest,
957) -> Result<EmptyObject, AnyError> {
958	let mut s = TcpStream::connect((req.host, req.port))
959		.await
960		.map_err(|e| wrap(e, "could not connect to address"))?;
961
962	tokio::io::copy_bidirectional(&mut stream, &mut s)
963		.await
964		.map_err(|e| wrap(e, "error copying stream data"))?;
965
966	Ok(EmptyObject {})
967}
968async fn handle_fs_connect(
969	mut stream: DuplexStream,
970	path: String,
971) -> Result<EmptyObject, AnyError> {
972	let mut s = get_socket_rw_stream(&PathBuf::from(path))
973		.await
974		.map_err(|e| wrap(e, "could not connect to socket"))?;
975
976	tokio::io::copy_bidirectional(&mut stream, &mut s)
977		.await
978		.map_err(|e| wrap(e, "error copying stream data"))?;
979
980	Ok(EmptyObject {})
981}
982
983async fn handle_fs_remove(path: String) -> Result<EmptyObject, AnyError> {
984	tokio::fs::remove_dir_all(path)
985		.await
986		.map_err(|e| wrap(e, "error removing directory"))?;
987	Ok(EmptyObject {})
988}
989
990fn handle_fs_rename(from_path: String, to_path: String) -> Result<EmptyObject, AnyError> {
991	std::fs::rename(from_path, to_path).map_err(|e| wrap(e, "error renaming"))?;
992	Ok(EmptyObject {})
993}
994
995fn handle_fs_mkdirp(path: String) -> Result<EmptyObject, AnyError> {
996	std::fs::create_dir_all(path).map_err(|e| wrap(e, "error creating directory"))?;
997	Ok(EmptyObject {})
998}
999
1000fn handle_fs_readdir(path: String) -> Result<FsReadDirResponse, AnyError> {
1001	let mut entries = std::fs::read_dir(path).map_err(|e| wrap(e, "error listing directory"))?;
1002
1003	let mut contents = Vec::new();
1004	while let Some(Ok(child)) = entries.next() {
1005		contents.push(FsReadDirEntry {
1006			name: child.file_name().to_string_lossy().into_owned(),
1007			kind: child.file_type().ok().map(|v| v.into()),
1008		});
1009	}
1010
1011	Ok(FsReadDirResponse { contents })
1012}
1013
1014fn handle_sys_kill(pid: u32) -> Result<SysKillResponse, AnyError> {
1015	Ok(SysKillResponse {
1016		success: kill_pid(pid),
1017	})
1018}
1019
1020fn handle_get_env() -> Result<GetEnvResponse, AnyError> {
1021	Ok(GetEnvResponse {
1022		env: std::env::vars().collect(),
1023		os_release: os_release().unwrap_or_else(|_| "unknown".to_string()),
1024		#[cfg(windows)]
1025		os_platform: "win32",
1026		#[cfg(target_os = "linux")]
1027		os_platform: "linux",
1028		#[cfg(target_os = "macos")]
1029		os_platform: "darwin",
1030	})
1031}
1032
1033fn handle_challenge_issue(
1034	params: ChallengeIssueParams,
1035	auth_state: &Arc<std::sync::Mutex<AuthState>>,
1036) -> Result<ChallengeIssueResponse, AnyError> {
1037	let challenge = create_challenge();
1038
1039	let mut auth_state = auth_state.lock().unwrap();
1040	if let AuthState::WaitingForChallenge(Some(s)) = &*auth_state {
1041		println!("looking for token {}, got {:?}", s, params.token);
1042		match &params.token {
1043			Some(t) if s != t => return Err(CodeError::AuthChallengeBadToken.into()),
1044			None => return Err(CodeError::AuthChallengeBadToken.into()),
1045			_ => {}
1046		}
1047	}
1048
1049	*auth_state = AuthState::ChallengeIssued(challenge.clone());
1050	Ok(ChallengeIssueResponse { challenge })
1051}
1052
1053fn handle_challenge_verify(
1054	response: String,
1055	auth_state: &Arc<std::sync::Mutex<AuthState>>,
1056) -> Result<EmptyObject, AnyError> {
1057	let mut auth_state = auth_state.lock().unwrap();
1058
1059	match &*auth_state {
1060		AuthState::Authenticated => Ok(EmptyObject {}),
1061		AuthState::WaitingForChallenge(_) => Err(CodeError::AuthChallengeNotIssued.into()),
1062		AuthState::ChallengeIssued(c) => match verify_challenge(c, &response) {
1063			false => Err(CodeError::AuthChallengeNotIssued.into()),
1064			true => {
1065				*auth_state = AuthState::Authenticated;
1066				Ok(EmptyObject {})
1067			}
1068		},
1069	}
1070}
1071
1072async fn handle_forward(
1073	log: &log::Logger,
1074	port_forwarding: &Option<PortForwarding>,
1075	params: ForwardParams,
1076) -> Result<ForwardResult, AnyError> {
1077	let port_forwarding = port_forwarding
1078		.as_ref()
1079		.ok_or(CodeError::PortForwardingNotAvailable)?;
1080	info!(log, "Forwarding port {}", params.port);
1081	let uri = port_forwarding.forward(params.port).await?;
1082	Ok(ForwardResult { uri })
1083}
1084
1085async fn handle_unforward(
1086	log: &log::Logger,
1087	port_forwarding: &Option<PortForwarding>,
1088	params: UnforwardParams,
1089) -> Result<EmptyObject, AnyError> {
1090	let port_forwarding = port_forwarding
1091		.as_ref()
1092		.ok_or(CodeError::PortForwardingNotAvailable)?;
1093	info!(log, "Unforwarding port {}", params.port);
1094	port_forwarding.unforward(params.port).await?;
1095	Ok(EmptyObject {})
1096}
1097
1098async fn handle_call_server_http(
1099	code_server: Option<SocketCodeServer>,
1100	params: CallServerHttpParams,
1101) -> Result<CallServerHttpResult, AnyError> {
1102	use hyper::{body, client::conn::Builder, Body, Request};
1103
1104	// We use Hyper directly here since reqwest doesn't support sockets/pipes.
1105	// See https://github.com/seanmonstar/reqwest/issues/39
1106
1107	let socket = match &code_server {
1108		Some(cs) => &cs.socket,
1109		None => return Err(AnyError::from(NoAttachedServerError())),
1110	};
1111
1112	let rw = get_socket_rw_stream(socket).await?;
1113
1114	let (mut request_sender, connection) = Builder::new()
1115		.handshake(rw)
1116		.await
1117		.map_err(|e| wrap(e, "error establishing connection"))?;
1118
1119	// start the connection processing; it's shut down when the sender is dropped
1120	tokio::spawn(connection);
1121
1122	let mut request_builder = Request::builder()
1123		.method::<&str>(params.method.as_ref())
1124		.uri(format!("http://127.0.0.1{}", params.path))
1125		.header("Host", "127.0.0.1");
1126
1127	for (k, v) in params.headers {
1128		request_builder = request_builder.header(k, v);
1129	}
1130	let request = request_builder
1131		.body(Body::from(params.body.unwrap_or_default()))
1132		.map_err(|e| wrap(e, "invalid request"))?;
1133
1134	let response = request_sender
1135		.send_request(request)
1136		.await
1137		.map_err(|e| wrap(e, "error sending request"))?;
1138
1139	Ok(CallServerHttpResult {
1140		status: response.status().as_u16(),
1141		headers: response
1142			.headers()
1143			.into_iter()
1144			.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
1145			.collect(),
1146		body: body::to_bytes(response)
1147			.await
1148			.map_err(|e| wrap(e, "error reading response body"))?
1149			.to_vec(),
1150	})
1151}
1152
1153async fn handle_acquire_cli(
1154	paths: &LauncherPaths,
1155	http: &Arc<FallbackSimpleHttp>,
1156	log: &log::Logger,
1157	params: AcquireCliParams,
1158) -> Result<SpawnResult, AnyError> {
1159	let update_service = UpdateService::new(log.clone(), http.clone());
1160
1161	let release = match params.commit_id {
1162		Some(commit) => Release {
1163			name: format!("{} CLI", PRODUCT_NAME_LONG),
1164			commit,
1165			platform: params.platform,
1166			quality: params.quality,
1167			target: TargetKind::Cli,
1168		},
1169		None => {
1170			update_service
1171				.get_latest_commit(params.platform, TargetKind::Cli, params.quality)
1172				.await?
1173		}
1174	};
1175
1176	let cli = download_cli_into_cache(&paths.cli_cache, &release, &update_service).await?;
1177	let file = tokio::fs::File::open(cli)
1178		.await
1179		.map_err(|e| wrap(e, "error opening cli file"))?;
1180
1181	handle_spawn::<_, DuplexStream>(log, params.spawn, Some(file), None, None).await
1182}
1183
1184async fn handle_spawn<Stdin, StdoutAndErr>(
1185	log: &log::Logger,
1186	params: SpawnParams,
1187	stdin: Option<Stdin>,
1188	stdout: Option<StdoutAndErr>,
1189	stderr: Option<StdoutAndErr>,
1190) -> Result<SpawnResult, AnyError>
1191where
1192	Stdin: AsyncRead + Unpin + Send + 'static,
1193	StdoutAndErr: AsyncWrite + Unpin + Send + 'static,
1194{
1195	debug!(
1196		log,
1197		"requested to spawn {} with args {:?}", params.command, params.args
1198	);
1199
1200	macro_rules! pipe_if {
1201		($e: expr) => {
1202			if $e {
1203				Stdio::piped()
1204			} else {
1205				Stdio::null()
1206			}
1207		};
1208	}
1209
1210	let mut p = new_tokio_command(&params.command);
1211	p.args(&params.args);
1212	p.envs(&params.env);
1213	p.stdin(pipe_if!(stdin.is_some()));
1214	p.stdout(pipe_if!(stdin.is_some()));
1215	p.stderr(pipe_if!(stderr.is_some()));
1216	if let Some(cwd) = &params.cwd {
1217		p.current_dir(cwd);
1218	}
1219
1220	#[cfg(target_os = "windows")]
1221	p.creation_flags(winapi::um::winbase::CREATE_NO_WINDOW);
1222
1223	let mut p = p.spawn().map_err(CodeError::ProcessSpawnFailed)?;
1224
1225	let block_futs = FuturesUnordered::new();
1226	let poll_futs = FuturesUnordered::new();
1227	if let (Some(mut a), Some(mut b)) = (p.stdout.take(), stdout) {
1228		block_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
1229	}
1230	if let (Some(mut a), Some(mut b)) = (p.stderr.take(), stderr) {
1231		block_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
1232	}
1233	if let (Some(mut b), Some(mut a)) = (p.stdin.take(), stdin) {
1234		poll_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
1235	}
1236
1237	wait_for_process_exit(log, &params.command, p, block_futs, poll_futs).await
1238}
1239
1240async fn handle_spawn_cli(
1241	log: &log::Logger,
1242	params: SpawnParams,
1243	mut protocol_in: DuplexStream,
1244	mut protocol_out: DuplexStream,
1245	mut log_out: DuplexStream,
1246) -> Result<SpawnResult, AnyError> {
1247	debug!(
1248		log,
1249		"requested to spawn cli {} with args {:?}", params.command, params.args
1250	);
1251
1252	let mut p = new_tokio_command(&params.command);
1253	p.args(&params.args);
1254
1255	// CLI args to spawn a server; contracted with clients that they should _not_ provide these.
1256	p.arg("--verbose");
1257	p.arg("command-shell");
1258
1259	p.envs(&params.env);
1260	p.stdin(Stdio::piped());
1261	p.stdout(Stdio::piped());
1262	p.stderr(Stdio::piped());
1263	if let Some(cwd) = &params.cwd {
1264		p.current_dir(cwd);
1265	}
1266
1267	let mut p = p.spawn().map_err(CodeError::ProcessSpawnFailed)?;
1268
1269	let mut stdin = p.stdin.take().unwrap();
1270	let mut stdout = p.stdout.take().unwrap();
1271	let mut stderr = p.stderr.take().unwrap();
1272
1273	// Start handling logs while doing the handshake in case there's some kind of error
1274	let log_pump = tokio::spawn(async move { tokio::io::copy(&mut stdout, &mut log_out).await });
1275
1276	// note: intentionally do not wrap stdin in a bufreader, since we don't
1277	// want to read anything other than our handshake messages.
1278	if let Err(e) = spawn_do_child_authentication(log, &mut stdin, &mut stderr).await {
1279		warning!(log, "failed to authenticate with child process {}", e);
1280		let _ = p.kill().await;
1281		return Err(e.into());
1282	}
1283
1284	debug!(log, "cli authenticated, attaching stdio");
1285	let block_futs = FuturesUnordered::new();
1286	let poll_futs = FuturesUnordered::new();
1287	poll_futs.push(async move { tokio::io::copy(&mut protocol_in, &mut stdin).await }.boxed());
1288	block_futs.push(async move { tokio::io::copy(&mut stderr, &mut protocol_out).await }.boxed());
1289	block_futs.push(async move { log_pump.await.unwrap() }.boxed());
1290
1291	wait_for_process_exit(log, &params.command, p, block_futs, poll_futs).await
1292}
1293
1294type TokioCopyFuture = dyn futures::Future<Output = Result<u64, std::io::Error>> + Send;
1295
1296async fn get_joined_result(
1297	mut process: tokio::process::Child,
1298	block_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
1299) -> Result<std::process::ExitStatus, std::io::Error> {
1300	let (_, r) = tokio::join!(futures::future::join_all(block_futs), process.wait());
1301	r
1302}
1303
1304/// Wait for the process to exit and sends the spawn result. Waits until the
1305/// `block_futs` and the process have exited, and polls the `poll_futs` while
1306/// doing so.
1307async fn wait_for_process_exit(
1308	log: &log::Logger,
1309	command: &str,
1310	process: tokio::process::Child,
1311	block_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
1312	poll_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
1313) -> Result<SpawnResult, AnyError> {
1314	let joined = get_joined_result(process, block_futs);
1315	pin!(joined);
1316
1317	let r = tokio::select! {
1318		_ = futures::future::join_all(poll_futs) => joined.await,
1319		r = &mut joined => r,
1320	};
1321
1322	let r = match r {
1323		Ok(e) => SpawnResult {
1324			message: e.to_string(),
1325			exit_code: e.code().unwrap_or(-1),
1326		},
1327		Err(e) => SpawnResult {
1328			message: e.to_string(),
1329			exit_code: -1,
1330		},
1331	};
1332
1333	debug!(
1334		log,
1335		"spawned cli {} exited with code {}", command, r.exit_code
1336	);
1337
1338	Ok(r)
1339}
1340
1341async fn spawn_do_child_authentication(
1342	log: &log::Logger,
1343	stdin: &mut ChildStdin,
1344	stdout: &mut ChildStderr,
1345) -> Result<(), CodeError> {
1346	let (msg_tx, msg_rx) = mpsc::unbounded_channel();
1347	let (shutdown_rx, shutdown) = new_barrier();
1348	let mut rpc = new_msgpack_rpc();
1349	let caller = rpc.get_caller(msg_tx);
1350
1351	let challenge_response = do_challenge_response_flow(caller, shutdown);
1352	let rpc = start_msgpack_rpc(
1353		rpc.methods(()).build(log.prefixed("client-auth")),
1354		stdout,
1355		stdin,
1356		msg_rx,
1357		shutdown_rx,
1358	);
1359	pin!(rpc);
1360
1361	tokio::select! {
1362		r = &mut rpc => {
1363			match r {
1364				// means shutdown happened cleanly already, we're good
1365				Ok(_) => Ok(()),
1366				Err(e) => Err(CodeError::ProcessSpawnHandshakeFailed(e))
1367			}
1368		},
1369		r = challenge_response => {
1370			r?;
1371			rpc.await.map(|_| ()).map_err(CodeError::ProcessSpawnFailed)
1372		}
1373	}
1374}
1375
1376async fn do_challenge_response_flow(
1377	caller: RpcCaller<MsgPackSerializer>,
1378	shutdown: BarrierOpener<()>,
1379) -> Result<(), CodeError> {
1380	let challenge: ChallengeIssueResponse = caller
1381		.call(METHOD_CHALLENGE_ISSUE, EmptyObject {})
1382		.await
1383		.unwrap()
1384		.map_err(CodeError::TunnelRpcCallFailed)?;
1385
1386	let _: EmptyObject = caller
1387		.call(
1388			METHOD_CHALLENGE_VERIFY,
1389			ChallengeVerifyParams {
1390				response: sign_challenge(&challenge.challenge),
1391			},
1392		)
1393		.await
1394		.unwrap()
1395		.map_err(CodeError::TunnelRpcCallFailed)?;
1396
1397	shutdown.open(());
1398
1399	Ok(())
1400}