1use crate::async_pipe::get_socket_rw_stream;
6use crate::constants::{CONTROL_PORT, PRODUCT_NAME_LONG};
7use crate::log;
8use crate::msgpack_rpc::{new_msgpack_rpc, start_msgpack_rpc, MsgPackCodec, MsgPackSerializer};
9use crate::options::Quality;
10use crate::rpc::{MaybeSync, RpcBuilder, RpcCaller, RpcDispatcher};
11use crate::self_update::SelfUpdate;
12use crate::state::LauncherPaths;
13use crate::tunnels::protocol::{HttpRequestParams, METHOD_CHALLENGE_ISSUE};
14use crate::tunnels::socket_signal::CloseReason;
15use crate::update_service::{Platform, Release, TargetKind, UpdateService};
16use crate::util::command::new_tokio_command;
17use crate::util::errors::{
18 wrap, AnyError, CodeError, MismatchedLaunchModeError, NoAttachedServerError,
19};
20use crate::util::http::{
21 DelegatedHttpRequest, DelegatedSimpleHttp, FallbackSimpleHttp, ReqwestSimpleHttp,
22};
23use crate::util::io::SilentCopyProgress;
24use crate::util::is_integrated_cli;
25use crate::util::machine::kill_pid;
26use crate::util::os::os_release;
27use crate::util::sync::{new_barrier, Barrier, BarrierOpener};
28
29use futures::stream::FuturesUnordered;
30use futures::FutureExt;
31use opentelemetry::trace::SpanKind;
32use opentelemetry::KeyValue;
33use std::collections::HashMap;
34use std::path::PathBuf;
35use std::process::Stdio;
36use tokio::net::TcpStream;
37use tokio::pin;
38use tokio::process::{ChildStderr, ChildStdin};
39use tokio_util::codec::Decoder;
40
41use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
42use std::sync::Arc;
43use std::time::Instant;
44use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, DuplexStream};
45use tokio::sync::{mpsc, Mutex};
46
47use super::challenge::{create_challenge, sign_challenge, verify_challenge};
48use super::code_server::{
49 download_cli_into_cache, AnyCodeServer, CodeServerArgs, ServerBuilder, ServerParamsRaw,
50 SocketCodeServer,
51};
52use super::dev_tunnels::ActiveTunnel;
53use super::paths::prune_stopped_servers;
54use super::port_forwarder::{PortForwarding, PortForwardingProcessor};
55use super::protocol::{
56 AcquireCliParams, CallServerHttpParams, CallServerHttpResult, ChallengeIssueParams,
57 ChallengeIssueResponse, ChallengeVerifyParams, ClientRequestMethod, EmptyObject, ForwardParams,
58 ForwardResult, FsReadDirEntry, FsReadDirResponse, FsRenameRequest, FsSinglePathRequest,
59 FsStatResponse, GetEnvResponse, GetHostnameResponse, HttpBodyParams, HttpHeadersParams,
60 NetConnectRequest, ServeParams, ServerLog, ServerMessageParams, SpawnParams, SpawnResult,
61 SysKillRequest, SysKillResponse, ToClientRequest, UnforwardParams, UpdateParams, UpdateResult,
62 VersionResponse, METHOD_CHALLENGE_VERIFY,
63};
64use super::server_bridge::ServerBridge;
65use super::server_multiplexer::ServerMultiplexer;
66use super::shutdown_signal::ShutdownSignal;
67use super::socket_signal::{
68 ClientMessageDecoder, ServerMessageDestination, ServerMessageSink, SocketSignal,
69};
70
71type HttpRequestsMap = Arc<std::sync::Mutex<HashMap<u32, DelegatedHttpRequest>>>;
72type CodeServerCell = Arc<Mutex<Option<SocketCodeServer>>>;
73
74struct HandlerContext {
75 log: log::Logger,
77 did_update: Arc<AtomicBool>,
79 auth_state: Arc<std::sync::Mutex<AuthState>>,
81 socket_tx: mpsc::Sender<SocketSignal>,
83 launcher_paths: LauncherPaths,
85 code_server: CodeServerCell,
87 server_bridges: ServerMultiplexer,
89 code_server_args: CodeServerArgs,
91 port_forwarding: Option<PortForwarding>,
93 platform: Platform,
95 http: Arc<FallbackSimpleHttp>,
97 http_requests: HttpRequestsMap,
99}
100
101enum AuthState {
103 WaitingForChallenge(Option<String>),
105 ChallengeIssued(String),
107 Authenticated,
109}
110
111static MESSAGE_ID_COUNTER: AtomicU32 = AtomicU32::new(0);
112
113pub fn next_message_id() -> u32 {
115 MESSAGE_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
116}
117
118impl HandlerContext {
119 async fn dispose(&self) {
120 self.server_bridges.dispose().await;
121 info!(self.log, "Disposed of connection to running server.");
122 }
123}
124
125enum ServerSignal {
126 Respawn,
132}
133
134pub enum Next {
135 Respawn,
137 Restart,
139 Exit,
141}
142
143pub struct ServerTermination {
144 pub next: Next,
145 pub tunnel: ActiveTunnel,
146}
147
148async fn preload_extensions(
149 log: &log::Logger,
150 platform: Platform,
151 mut args: CodeServerArgs,
152 launcher_paths: LauncherPaths,
153) -> Result<(), AnyError> {
154 args.start_server = false;
155
156 let params_raw = ServerParamsRaw {
157 commit_id: None,
158 quality: Quality::Stable,
159 code_server_args: args.clone(),
160 headless: true,
161 platform,
162 };
163
164 let http = Arc::new(ReqwestSimpleHttp::new());
166 let resolved = params_raw.resolve(log, http.clone()).await?;
167 let sb = ServerBuilder::new(log, &resolved, &launcher_paths, http.clone());
168
169 sb.setup().await?;
170 sb.install_extensions().await
171}
172
173pub async fn serve(
177 log: &log::Logger,
178 mut tunnel: ActiveTunnel,
179 launcher_paths: &LauncherPaths,
180 code_server_args: &CodeServerArgs,
181 platform: Platform,
182 mut shutdown_rx: Barrier<ShutdownSignal>,
183) -> Result<ServerTermination, AnyError> {
184 let mut port = tunnel.add_port_direct(CONTROL_PORT).await?;
185 let mut forwarding = PortForwardingProcessor::new();
186 let (tx, mut rx) = mpsc::channel::<ServerSignal>(4);
187 let (exit_barrier, signal_exit) = new_barrier();
188
189 if !code_server_args.install_extensions.is_empty() {
190 info!(
191 log,
192 "Preloading extensions using stable server: {:?}", code_server_args.install_extensions
193 );
194 let log = log.clone();
195 let code_server_args = code_server_args.clone();
196 let launcher_paths = launcher_paths.clone();
197 tokio::spawn(async move {
199 if let Err(e) =
200 preload_extensions(&log, platform, code_server_args, launcher_paths).await
201 {
202 warning!(log, "Failed to preload extensions: {:?}", e);
203 } else {
204 info!(log, "Extension install complete");
205 }
206 });
207 }
208
209 loop {
210 tokio::select! {
211 Ok(reason) = shutdown_rx.wait() => {
212 info!(log, "Shutting down: {}", reason);
213 drop(signal_exit);
214 return Ok(ServerTermination {
215 next: match reason {
216 ShutdownSignal::RpcRestartRequested => Next::Restart,
217 _ => Next::Exit,
218 },
219 tunnel,
220 });
221 },
222 c = rx.recv() => {
223 if let Some(ServerSignal::Respawn) = c {
224 drop(signal_exit);
225 return Ok(ServerTermination {
226 next: Next::Respawn,
227 tunnel,
228 });
229 }
230 },
231 Some(w) = forwarding.recv() => {
232 forwarding.process(w, &mut tunnel).await;
233 },
234 l = port.recv() => {
235 let socket = match l {
236 Some(p) => p,
237 None => {
238 warning!(log, "ssh tunnel disposed, tearing down");
239 return Ok(ServerTermination {
240 next: Next::Restart,
241 tunnel,
242 });
243 }
244 };
245
246 let own_log = log.prefixed(&log::new_rpc_prefix());
247 let own_tx = tx.clone();
248 let own_paths = launcher_paths.clone();
249 let own_exit = exit_barrier.clone();
250 let own_code_server_args = code_server_args.clone();
251 let own_forwarding = forwarding.handle();
252
253 tokio::spawn(async move {
254 use opentelemetry::trace::{FutureExt, TraceContextExt};
255
256 let span = own_log.span("server.socket").with_kind(SpanKind::Consumer).start(own_log.tracer());
257 let cx = opentelemetry::Context::current_with_span(span);
258 let serve_at = Instant::now();
259
260 debug!(own_log, "Serving new connection");
261
262 let (writehalf, readhalf) = socket.into_split();
263 let stats = process_socket(readhalf, writehalf, own_tx, Some(own_forwarding), ServeStreamParams {
264 log: own_log,
265 launcher_paths: own_paths,
266 code_server_args: own_code_server_args,
267 platform,
268 exit_barrier: own_exit,
269 requires_auth: AuthRequired::None,
270 }).with_context(cx.clone()).await;
271
272 cx.span().add_event(
273 "socket.bandwidth",
274 vec![
275 KeyValue::new("tx", stats.tx as f64),
276 KeyValue::new("rx", stats.rx as f64),
277 KeyValue::new("duration_ms", serve_at.elapsed().as_millis() as f64),
278 ],
279 );
280 cx.span().end();
281 });
282 }
283 }
284 }
285}
286
287#[derive(Clone)]
288pub enum AuthRequired {
289 None,
290 VSDA,
291 VSDAWithToken(String),
292}
293
294#[derive(Clone)]
295pub struct ServeStreamParams {
296 pub log: log::Logger,
297 pub launcher_paths: LauncherPaths,
298 pub code_server_args: CodeServerArgs,
299 pub platform: Platform,
300 pub requires_auth: AuthRequired,
301 pub exit_barrier: Barrier<ShutdownSignal>,
302}
303
304pub async fn serve_stream(
305 readhalf: impl AsyncRead + Send + Unpin + 'static,
306 writehalf: impl AsyncWrite + Unpin,
307 params: ServeStreamParams,
308) -> SocketStats {
309 let (server_rx, server_tx) = mpsc::channel(1);
312 drop(server_tx);
313
314 process_socket(readhalf, writehalf, server_rx, None, params).await
315}
316
317pub struct SocketStats {
318 rx: usize,
319 tx: usize,
320}
321
322#[allow(clippy::too_many_arguments)]
323fn make_socket_rpc(
324 log: log::Logger,
325 socket_tx: mpsc::Sender<SocketSignal>,
326 http_delegated: DelegatedSimpleHttp,
327 launcher_paths: LauncherPaths,
328 code_server_args: CodeServerArgs,
329 port_forwarding: Option<PortForwarding>,
330 requires_auth: AuthRequired,
331 platform: Platform,
332 http_requests: HttpRequestsMap,
333) -> RpcDispatcher<MsgPackSerializer, HandlerContext> {
334 let server_bridges = ServerMultiplexer::new();
335 let mut rpc = RpcBuilder::new(MsgPackSerializer {}).methods(HandlerContext {
336 did_update: Arc::new(AtomicBool::new(false)),
337 auth_state: Arc::new(std::sync::Mutex::new(match requires_auth {
338 AuthRequired::VSDAWithToken(t) => AuthState::WaitingForChallenge(Some(t)),
339 AuthRequired::VSDA => AuthState::WaitingForChallenge(None),
340 AuthRequired::None => AuthState::Authenticated,
341 })),
342 socket_tx,
343 log: log.clone(),
344 launcher_paths,
345 code_server_args,
346 code_server: Arc::new(Mutex::new(None)),
347 server_bridges,
348 port_forwarding,
349 platform,
350 http: Arc::new(FallbackSimpleHttp::new(
351 ReqwestSimpleHttp::new(),
352 http_delegated,
353 )),
354 http_requests,
355 });
356
357 rpc.register_sync("ping", |_: EmptyObject, _| Ok(EmptyObject {}));
358 rpc.register_sync("gethostname", |_: EmptyObject, _| handle_get_hostname());
359 rpc.register_sync("sys_kill", |p: SysKillRequest, c| {
360 ensure_auth(&c.auth_state)?;
361 handle_sys_kill(p.pid)
362 });
363 rpc.register_sync("fs_stat", |p: FsSinglePathRequest, c| {
364 ensure_auth(&c.auth_state)?;
365 handle_stat(p.path)
366 });
367 rpc.register_duplex(
368 "fs_read",
369 1,
370 move |mut streams, p: FsSinglePathRequest, c| async move {
371 ensure_auth(&c.auth_state)?;
372 handle_fs_read(streams.remove(0), p.path).await
373 },
374 );
375 rpc.register_duplex(
376 "fs_write",
377 1,
378 move |mut streams, p: FsSinglePathRequest, c| async move {
379 ensure_auth(&c.auth_state)?;
380 handle_fs_write(streams.remove(0), p.path).await
381 },
382 );
383 rpc.register_duplex(
384 "fs_connect",
385 1,
386 move |mut streams, p: FsSinglePathRequest, c| async move {
387 ensure_auth(&c.auth_state)?;
388 handle_fs_connect(streams.remove(0), p.path).await
389 },
390 );
391 rpc.register_duplex(
392 "net_connect",
393 1,
394 move |mut streams, n: NetConnectRequest, c| async move {
395 ensure_auth(&c.auth_state)?;
396 handle_net_connect(streams.remove(0), n).await
397 },
398 );
399 rpc.register_async("fs_rm", move |p: FsSinglePathRequest, c| async move {
400 ensure_auth(&c.auth_state)?;
401 handle_fs_remove(p.path).await
402 });
403 rpc.register_sync("fs_mkdirp", |p: FsSinglePathRequest, c| {
404 ensure_auth(&c.auth_state)?;
405 handle_fs_mkdirp(p.path)
406 });
407 rpc.register_sync("fs_rename", |p: FsRenameRequest, c| {
408 ensure_auth(&c.auth_state)?;
409 handle_fs_rename(p.from_path, p.to_path)
410 });
411 rpc.register_sync("fs_readdir", |p: FsSinglePathRequest, c| {
412 ensure_auth(&c.auth_state)?;
413 handle_fs_readdir(p.path)
414 });
415 rpc.register_sync("get_env", |_: EmptyObject, c| {
416 ensure_auth(&c.auth_state)?;
417 handle_get_env()
418 });
419 rpc.register_sync(METHOD_CHALLENGE_ISSUE, |p: ChallengeIssueParams, c| {
420 handle_challenge_issue(p, &c.auth_state)
421 });
422 rpc.register_sync(METHOD_CHALLENGE_VERIFY, |p: ChallengeVerifyParams, c| {
423 handle_challenge_verify(p.response, &c.auth_state)
424 });
425 rpc.register_async("serve", move |params: ServeParams, c| async move {
426 ensure_auth(&c.auth_state)?;
427 handle_serve(c, params).await
428 });
429 rpc.register_async("update", |p: UpdateParams, c| async move {
430 handle_update(&c.http, &c.log, &c.did_update, &p).await
431 });
432 rpc.register_sync("servermsg", |m: ServerMessageParams, c| {
433 if let Err(e) = handle_server_message(&c.log, &c.server_bridges, m) {
434 warning!(c.log, "error handling call: {:?}", e);
435 }
436 Ok(EmptyObject {})
437 });
438 rpc.register_sync("prune", |_: EmptyObject, c| handle_prune(&c.launcher_paths));
439 rpc.register_async("callserverhttp", |p: CallServerHttpParams, c| async move {
440 let code_server = c.code_server.lock().await.clone();
441 handle_call_server_http(code_server, p).await
442 });
443 rpc.register_async("forward", |p: ForwardParams, c| async move {
444 ensure_auth(&c.auth_state)?;
445 handle_forward(&c.log, &c.port_forwarding, p).await
446 });
447 rpc.register_async("unforward", |p: UnforwardParams, c| async move {
448 ensure_auth(&c.auth_state)?;
449 handle_unforward(&c.log, &c.port_forwarding, p).await
450 });
451 rpc.register_async("acquire_cli", |p: AcquireCliParams, c| async move {
452 ensure_auth(&c.auth_state)?;
453 handle_acquire_cli(&c.launcher_paths, &c.http, &c.log, p).await
454 });
455 rpc.register_duplex("spawn", 3, |mut streams, p: SpawnParams, c| async move {
456 ensure_auth(&c.auth_state)?;
457 handle_spawn(
458 &c.log,
459 p,
460 Some(streams.remove(0)),
461 Some(streams.remove(0)),
462 Some(streams.remove(0)),
463 )
464 .await
465 });
466 rpc.register_duplex(
467 "spawn_cli",
468 3,
469 |mut streams, p: SpawnParams, c| async move {
470 ensure_auth(&c.auth_state)?;
471 handle_spawn_cli(
472 &c.log,
473 p,
474 streams.remove(0),
475 streams.remove(0),
476 streams.remove(0),
477 )
478 .await
479 },
480 );
481 rpc.register_sync("httpheaders", |p: HttpHeadersParams, c| {
482 if let Some(req) = c.http_requests.lock().unwrap().get(&p.req_id) {
483 trace!(c.log, "got {} response for req {}", p.status_code, p.req_id);
484 req.initial_response(p.status_code, p.headers);
485 } else {
486 warning!(c.log, "got response for unknown req {}", p.req_id);
487 }
488 Ok(EmptyObject {})
489 });
490 rpc.register_sync("httpbody", move |p: HttpBodyParams, c| {
491 let mut reqs = c.http_requests.lock().unwrap();
492 if let Some(req) = reqs.get(&p.req_id) {
493 if !p.segment.is_empty() {
494 req.body(p.segment);
495 }
496 if p.complete {
497 trace!(c.log, "delegated request {} completed", p.req_id);
498 reqs.remove(&p.req_id);
499 }
500 }
501 Ok(EmptyObject {})
502 });
503 rpc.register_sync(
504 "version",
505 |_: EmptyObject, _| Ok(VersionResponse::default()),
506 );
507
508 rpc.build(log)
509}
510
511fn ensure_auth(is_authed: &Arc<std::sync::Mutex<AuthState>>) -> Result<(), AnyError> {
512 if let AuthState::Authenticated = &*is_authed.lock().unwrap() {
513 Ok(())
514 } else {
515 Err(CodeError::ServerAuthRequired.into())
516 }
517}
518
519#[allow(clippy::too_many_arguments)] async fn process_socket(
521 readhalf: impl AsyncRead + Send + Unpin + 'static,
522 mut writehalf: impl AsyncWrite + Unpin,
523 server_tx: mpsc::Sender<ServerSignal>,
524 port_forwarding: Option<PortForwarding>,
525 params: ServeStreamParams,
526) -> SocketStats {
527 let ServeStreamParams {
528 mut exit_barrier,
529 log,
530 launcher_paths,
531 code_server_args,
532 platform,
533 requires_auth,
534 } = params;
535
536 let (http_delegated, mut http_rx) = DelegatedSimpleHttp::new(log.clone());
537 let (socket_tx, mut socket_rx) = mpsc::channel(4);
538 let rx_counter = Arc::new(AtomicUsize::new(0));
539 let http_requests = Arc::new(std::sync::Mutex::new(HashMap::new()));
540
541 let already_authed = matches!(requires_auth, AuthRequired::None);
542 let rpc = make_socket_rpc(
543 log.clone(),
544 socket_tx.clone(),
545 http_delegated,
546 launcher_paths,
547 code_server_args,
548 port_forwarding,
549 requires_auth,
550 platform,
551 http_requests.clone(),
552 );
553
554 {
555 let log = log.clone();
556 let rx_counter = rx_counter.clone();
557 let socket_tx = socket_tx.clone();
558 let exit_barrier = exit_barrier.clone();
559 tokio::spawn(async move {
560 if already_authed {
561 send_version(&socket_tx).await;
562 }
563
564 if let Err(e) =
565 handle_socket_read(&log, readhalf, exit_barrier, &socket_tx, rx_counter, &rpc).await
566 {
567 debug!(log, "closing socket reader: {}", e);
568 socket_tx
569 .send(SocketSignal::CloseWith(CloseReason(format!("{}", e))))
570 .await
571 .ok();
572 }
573
574 let ctx = rpc.context();
575
576 if ctx.did_update.load(Ordering::SeqCst) {
578 server_tx.send(ServerSignal::Respawn).await.ok();
579 }
580
581 ctx.dispose().await;
582
583 let _ = socket_tx
584 .send(SocketSignal::CloseWith(CloseReason("eof".to_string())))
585 .await;
586 });
587 }
588
589 let mut tx_counter = 0;
590
591 loop {
592 tokio::select! {
593 _ = exit_barrier.wait() => {
594 writehalf.shutdown().await.ok();
595 break;
596 },
597 Some(r) = http_rx.recv() => {
598 let id = next_message_id();
599 let serialized = rmp_serde::to_vec_named(&ToClientRequest {
600 id: None,
601 params: ClientRequestMethod::makehttpreq(HttpRequestParams {
602 url: &r.url,
603 method: r.method,
604 req_id: id,
605 }),
606 })
607 .unwrap();
608
609 http_requests.lock().unwrap().insert(id, r);
610
611 tx_counter += serialized.len();
612 if let Err(e) = writehalf.write_all(&serialized).await {
613 debug!(log, "Closing connection: {}", e);
614 break;
615 }
616 }
617 recv = socket_rx.recv() => match recv {
618 None => break,
619 Some(message) => match message {
620 SocketSignal::Send(bytes) => {
621 tx_counter += bytes.len();
622 if let Err(e) = writehalf.write_all(&bytes).await {
623 debug!(log, "Closing connection: {}", e);
624 break;
625 }
626 }
627 SocketSignal::CloseWith(reason) => {
628 debug!(log, "Closing connection: {}", reason.0);
629 break;
630 }
631 }
632 }
633 }
634 }
635
636 SocketStats {
637 tx: tx_counter,
638 rx: rx_counter.load(Ordering::Acquire),
639 }
640}
641
642async fn send_version(tx: &mpsc::Sender<SocketSignal>) {
643 tx.send(SocketSignal::from_message(&ToClientRequest {
644 id: None,
645 params: ClientRequestMethod::version(VersionResponse::default()),
646 }))
647 .await
648 .ok();
649}
650async fn handle_socket_read(
651 _log: &log::Logger,
652 readhalf: impl AsyncRead + Unpin,
653 mut closer: Barrier<ShutdownSignal>,
654 socket_tx: &mpsc::Sender<SocketSignal>,
655 rx_counter: Arc<AtomicUsize>,
656 rpc: &RpcDispatcher<MsgPackSerializer, HandlerContext>,
657) -> Result<(), std::io::Error> {
658 let mut readhalf = BufReader::new(readhalf);
659 let mut decoder = MsgPackCodec::new();
660 let mut decoder_buf = bytes::BytesMut::new();
661
662 loop {
663 let read_len = tokio::select! {
664 r = readhalf.read_buf(&mut decoder_buf) => r,
665 _ = closer.wait() => Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "eof")),
666 }?;
667
668 if read_len == 0 {
669 return Ok(());
670 }
671
672 rx_counter.fetch_add(read_len, Ordering::Relaxed);
673
674 while let Some(frame) = decoder.decode(&mut decoder_buf)? {
675 match rpc.dispatch_with_partial(&frame.vec, frame.obj) {
676 MaybeSync::Sync(Some(v)) => {
677 if socket_tx.send(SocketSignal::Send(v)).await.is_err() {
678 return Ok(());
679 }
680 }
681 MaybeSync::Sync(None) => continue,
682 MaybeSync::Future(fut) => {
683 let socket_tx = socket_tx.clone();
684 tokio::spawn(async move {
685 if let Some(v) = fut.await {
686 socket_tx.send(SocketSignal::Send(v)).await.ok();
687 }
688 });
689 }
690 MaybeSync::Stream((stream, fut)) => {
691 if let Some(stream) = stream {
692 rpc.register_stream(socket_tx.clone(), stream).await;
693 }
694 let socket_tx = socket_tx.clone();
695 tokio::spawn(async move {
696 if let Some(v) = fut.await {
697 socket_tx.send(SocketSignal::Send(v)).await.ok();
698 }
699 });
700 }
701 }
702 }
703 }
704}
705
706#[derive(Clone)]
707struct ServerOutputSink {
708 tx: mpsc::Sender<SocketSignal>,
709}
710
711impl log::LogSink for ServerOutputSink {
712 fn write_log(&self, level: log::Level, _prefix: &str, message: &str) {
713 let s = SocketSignal::from_message(&ToClientRequest {
714 id: None,
715 params: ClientRequestMethod::serverlog(ServerLog {
716 line: message,
717 level: level.to_u8(),
718 }),
719 });
720
721 self.tx.try_send(s).ok();
722 }
723
724 fn write_result(&self, _message: &str) {}
725}
726
727async fn handle_serve(
728 c: Arc<HandlerContext>,
729 params: ServeParams,
730) -> Result<EmptyObject, AnyError> {
731 let mut csa = c.code_server_args.clone();
733 csa.connection_token = params.connection_token.or(csa.connection_token);
734 csa.install_extensions.extend(params.extensions.into_iter());
735
736 let params_raw = ServerParamsRaw {
737 commit_id: params.commit_id,
738 quality: params.quality,
739 code_server_args: csa,
740 headless: true,
741 platform: c.platform,
742 };
743
744 let resolved = if params.use_local_download {
745 params_raw
746 .resolve(&c.log, Arc::new(c.http.delegated()))
747 .await
748 } else {
749 params_raw.resolve(&c.log, c.http.clone()).await
750 }?;
751
752 let mut server_ref = c.code_server.lock().await;
753 let server = match &*server_ref {
754 Some(o) => o.clone(),
755 None => {
756 let install_log = c.log.tee(ServerOutputSink {
757 tx: c.socket_tx.clone(),
758 });
759
760 macro_rules! do_setup {
761 ($sb:expr) => {
762 match $sb.get_running().await? {
763 Some(AnyCodeServer::Socket(s)) => s,
764 Some(_) => return Err(AnyError::from(MismatchedLaunchModeError())),
765 None => {
766 $sb.setup().await?;
767 $sb.listen_on_default_socket().await?
768 }
769 }
770 };
771 }
772
773 let server = if params.use_local_download {
774 let sb = ServerBuilder::new(
775 &install_log,
776 &resolved,
777 &c.launcher_paths,
778 Arc::new(c.http.delegated()),
779 );
780 do_setup!(sb)
781 } else {
782 let sb =
783 ServerBuilder::new(&install_log, &resolved, &c.launcher_paths, c.http.clone());
784 do_setup!(sb)
785 };
786
787 server_ref.replace(server.clone());
788 server
789 }
790 };
791
792 attach_server_bridge(
793 &c.log,
794 server,
795 c.socket_tx.clone(),
796 c.server_bridges.clone(),
797 params.socket_id,
798 params.compress,
799 )
800 .await?;
801 Ok(EmptyObject {})
802}
803
804async fn attach_server_bridge(
805 log: &log::Logger,
806 code_server: SocketCodeServer,
807 socket_tx: mpsc::Sender<SocketSignal>,
808 multiplexer: ServerMultiplexer,
809 socket_id: u16,
810 compress: bool,
811) -> Result<u16, AnyError> {
812 let (server_messages, decoder) = if compress {
813 (
814 ServerMessageSink::new_compressed(
815 multiplexer.clone(),
816 socket_id,
817 ServerMessageDestination::Channel(socket_tx),
818 ),
819 ClientMessageDecoder::new_compressed(),
820 )
821 } else {
822 (
823 ServerMessageSink::new_plain(
824 multiplexer.clone(),
825 socket_id,
826 ServerMessageDestination::Channel(socket_tx),
827 ),
828 ClientMessageDecoder::new_plain(),
829 )
830 };
831
832 let attached_fut = ServerBridge::new(&code_server.socket, server_messages, decoder).await;
833 match attached_fut {
834 Ok(a) => {
835 multiplexer.register(socket_id, a);
836 trace!(log, "Attached to server");
837 Ok(socket_id)
838 }
839 Err(e) => Err(e),
840 }
841}
842
843fn handle_server_message(
846 log: &log::Logger,
847 multiplexer: &ServerMultiplexer,
848 params: ServerMessageParams,
849) -> Result<EmptyObject, AnyError> {
850 if multiplexer.write_message(log, params.i, params.body) {
851 Ok(EmptyObject {})
852 } else {
853 Err(AnyError::from(NoAttachedServerError()))
854 }
855}
856
857fn handle_prune(paths: &LauncherPaths) -> Result<Vec<String>, AnyError> {
858 prune_stopped_servers(paths).map(|v| {
859 v.iter()
860 .map(|p| p.server_dir.display().to_string())
861 .collect()
862 })
863}
864
865async fn handle_update(
866 http: &Arc<FallbackSimpleHttp>,
867 log: &log::Logger,
868 did_update: &AtomicBool,
869 params: &UpdateParams,
870) -> Result<UpdateResult, AnyError> {
871 if matches!(is_integrated_cli(), Ok(true)) || did_update.load(Ordering::SeqCst) {
872 return Ok(UpdateResult {
873 up_to_date: true,
874 did_update: false,
875 });
876 }
877
878 let update_service = UpdateService::new(log.clone(), http.clone());
879 let updater = SelfUpdate::new(&update_service)?;
880 let latest_release = updater.get_current_release().await?;
881 let up_to_date = updater.is_up_to_date_with(&latest_release);
882
883 let _ = updater.cleanup_old_update();
884
885 if !params.do_update || up_to_date {
886 return Ok(UpdateResult {
887 up_to_date,
888 did_update: false,
889 });
890 }
891
892 if did_update
893 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
894 .is_err()
895 {
896 return Ok(UpdateResult {
897 up_to_date: true,
898 did_update: true, });
900 }
901
902 info!(log, "Updating CLI to {}", latest_release);
903
904 updater
905 .do_update(&latest_release, SilentCopyProgress())
906 .await?;
907
908 Ok(UpdateResult {
909 up_to_date: true,
910 did_update: true,
911 })
912}
913
914fn handle_get_hostname() -> Result<GetHostnameResponse, AnyError> {
915 Ok(GetHostnameResponse {
916 value: gethostname::gethostname().to_string_lossy().into_owned(),
917 })
918}
919
920fn handle_stat(path: String) -> Result<FsStatResponse, AnyError> {
921 Ok(std::fs::metadata(path)
922 .map(|m| FsStatResponse {
923 exists: true,
924 size: Some(m.len()),
925 kind: Some(m.file_type().into()),
926 })
927 .unwrap_or_default())
928}
929
930async fn handle_fs_read(mut out: DuplexStream, path: String) -> Result<EmptyObject, AnyError> {
931 let mut f = tokio::fs::File::open(path)
932 .await
933 .map_err(|e| wrap(e, "file not found"))?;
934
935 tokio::io::copy(&mut f, &mut out)
936 .await
937 .map_err(|e| wrap(e, "error reading file"))?;
938
939 Ok(EmptyObject {})
940}
941
942async fn handle_fs_write(mut input: DuplexStream, path: String) -> Result<EmptyObject, AnyError> {
943 let mut f = tokio::fs::File::create(path)
944 .await
945 .map_err(|e| wrap(e, "file not found"))?;
946
947 tokio::io::copy(&mut input, &mut f)
948 .await
949 .map_err(|e| wrap(e, "error writing file"))?;
950
951 Ok(EmptyObject {})
952}
953
954async fn handle_net_connect(
955 mut stream: DuplexStream,
956 req: NetConnectRequest,
957) -> Result<EmptyObject, AnyError> {
958 let mut s = TcpStream::connect((req.host, req.port))
959 .await
960 .map_err(|e| wrap(e, "could not connect to address"))?;
961
962 tokio::io::copy_bidirectional(&mut stream, &mut s)
963 .await
964 .map_err(|e| wrap(e, "error copying stream data"))?;
965
966 Ok(EmptyObject {})
967}
968async fn handle_fs_connect(
969 mut stream: DuplexStream,
970 path: String,
971) -> Result<EmptyObject, AnyError> {
972 let mut s = get_socket_rw_stream(&PathBuf::from(path))
973 .await
974 .map_err(|e| wrap(e, "could not connect to socket"))?;
975
976 tokio::io::copy_bidirectional(&mut stream, &mut s)
977 .await
978 .map_err(|e| wrap(e, "error copying stream data"))?;
979
980 Ok(EmptyObject {})
981}
982
983async fn handle_fs_remove(path: String) -> Result<EmptyObject, AnyError> {
984 tokio::fs::remove_dir_all(path)
985 .await
986 .map_err(|e| wrap(e, "error removing directory"))?;
987 Ok(EmptyObject {})
988}
989
990fn handle_fs_rename(from_path: String, to_path: String) -> Result<EmptyObject, AnyError> {
991 std::fs::rename(from_path, to_path).map_err(|e| wrap(e, "error renaming"))?;
992 Ok(EmptyObject {})
993}
994
995fn handle_fs_mkdirp(path: String) -> Result<EmptyObject, AnyError> {
996 std::fs::create_dir_all(path).map_err(|e| wrap(e, "error creating directory"))?;
997 Ok(EmptyObject {})
998}
999
1000fn handle_fs_readdir(path: String) -> Result<FsReadDirResponse, AnyError> {
1001 let mut entries = std::fs::read_dir(path).map_err(|e| wrap(e, "error listing directory"))?;
1002
1003 let mut contents = Vec::new();
1004 while let Some(Ok(child)) = entries.next() {
1005 contents.push(FsReadDirEntry {
1006 name: child.file_name().to_string_lossy().into_owned(),
1007 kind: child.file_type().ok().map(|v| v.into()),
1008 });
1009 }
1010
1011 Ok(FsReadDirResponse { contents })
1012}
1013
1014fn handle_sys_kill(pid: u32) -> Result<SysKillResponse, AnyError> {
1015 Ok(SysKillResponse {
1016 success: kill_pid(pid),
1017 })
1018}
1019
1020fn handle_get_env() -> Result<GetEnvResponse, AnyError> {
1021 Ok(GetEnvResponse {
1022 env: std::env::vars().collect(),
1023 os_release: os_release().unwrap_or_else(|_| "unknown".to_string()),
1024 #[cfg(windows)]
1025 os_platform: "win32",
1026 #[cfg(target_os = "linux")]
1027 os_platform: "linux",
1028 #[cfg(target_os = "macos")]
1029 os_platform: "darwin",
1030 })
1031}
1032
1033fn handle_challenge_issue(
1034 params: ChallengeIssueParams,
1035 auth_state: &Arc<std::sync::Mutex<AuthState>>,
1036) -> Result<ChallengeIssueResponse, AnyError> {
1037 let challenge = create_challenge();
1038
1039 let mut auth_state = auth_state.lock().unwrap();
1040 if let AuthState::WaitingForChallenge(Some(s)) = &*auth_state {
1041 println!("looking for token {}, got {:?}", s, params.token);
1042 match ¶ms.token {
1043 Some(t) if s != t => return Err(CodeError::AuthChallengeBadToken.into()),
1044 None => return Err(CodeError::AuthChallengeBadToken.into()),
1045 _ => {}
1046 }
1047 }
1048
1049 *auth_state = AuthState::ChallengeIssued(challenge.clone());
1050 Ok(ChallengeIssueResponse { challenge })
1051}
1052
1053fn handle_challenge_verify(
1054 response: String,
1055 auth_state: &Arc<std::sync::Mutex<AuthState>>,
1056) -> Result<EmptyObject, AnyError> {
1057 let mut auth_state = auth_state.lock().unwrap();
1058
1059 match &*auth_state {
1060 AuthState::Authenticated => Ok(EmptyObject {}),
1061 AuthState::WaitingForChallenge(_) => Err(CodeError::AuthChallengeNotIssued.into()),
1062 AuthState::ChallengeIssued(c) => match verify_challenge(c, &response) {
1063 false => Err(CodeError::AuthChallengeNotIssued.into()),
1064 true => {
1065 *auth_state = AuthState::Authenticated;
1066 Ok(EmptyObject {})
1067 }
1068 },
1069 }
1070}
1071
1072async fn handle_forward(
1073 log: &log::Logger,
1074 port_forwarding: &Option<PortForwarding>,
1075 params: ForwardParams,
1076) -> Result<ForwardResult, AnyError> {
1077 let port_forwarding = port_forwarding
1078 .as_ref()
1079 .ok_or(CodeError::PortForwardingNotAvailable)?;
1080 info!(log, "Forwarding port {}", params.port);
1081 let uri = port_forwarding.forward(params.port).await?;
1082 Ok(ForwardResult { uri })
1083}
1084
1085async fn handle_unforward(
1086 log: &log::Logger,
1087 port_forwarding: &Option<PortForwarding>,
1088 params: UnforwardParams,
1089) -> Result<EmptyObject, AnyError> {
1090 let port_forwarding = port_forwarding
1091 .as_ref()
1092 .ok_or(CodeError::PortForwardingNotAvailable)?;
1093 info!(log, "Unforwarding port {}", params.port);
1094 port_forwarding.unforward(params.port).await?;
1095 Ok(EmptyObject {})
1096}
1097
1098async fn handle_call_server_http(
1099 code_server: Option<SocketCodeServer>,
1100 params: CallServerHttpParams,
1101) -> Result<CallServerHttpResult, AnyError> {
1102 use hyper::{body, client::conn::Builder, Body, Request};
1103
1104 let socket = match &code_server {
1108 Some(cs) => &cs.socket,
1109 None => return Err(AnyError::from(NoAttachedServerError())),
1110 };
1111
1112 let rw = get_socket_rw_stream(socket).await?;
1113
1114 let (mut request_sender, connection) = Builder::new()
1115 .handshake(rw)
1116 .await
1117 .map_err(|e| wrap(e, "error establishing connection"))?;
1118
1119 tokio::spawn(connection);
1121
1122 let mut request_builder = Request::builder()
1123 .method::<&str>(params.method.as_ref())
1124 .uri(format!("http://127.0.0.1{}", params.path))
1125 .header("Host", "127.0.0.1");
1126
1127 for (k, v) in params.headers {
1128 request_builder = request_builder.header(k, v);
1129 }
1130 let request = request_builder
1131 .body(Body::from(params.body.unwrap_or_default()))
1132 .map_err(|e| wrap(e, "invalid request"))?;
1133
1134 let response = request_sender
1135 .send_request(request)
1136 .await
1137 .map_err(|e| wrap(e, "error sending request"))?;
1138
1139 Ok(CallServerHttpResult {
1140 status: response.status().as_u16(),
1141 headers: response
1142 .headers()
1143 .into_iter()
1144 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
1145 .collect(),
1146 body: body::to_bytes(response)
1147 .await
1148 .map_err(|e| wrap(e, "error reading response body"))?
1149 .to_vec(),
1150 })
1151}
1152
1153async fn handle_acquire_cli(
1154 paths: &LauncherPaths,
1155 http: &Arc<FallbackSimpleHttp>,
1156 log: &log::Logger,
1157 params: AcquireCliParams,
1158) -> Result<SpawnResult, AnyError> {
1159 let update_service = UpdateService::new(log.clone(), http.clone());
1160
1161 let release = match params.commit_id {
1162 Some(commit) => Release {
1163 name: format!("{} CLI", PRODUCT_NAME_LONG),
1164 commit,
1165 platform: params.platform,
1166 quality: params.quality,
1167 target: TargetKind::Cli,
1168 },
1169 None => {
1170 update_service
1171 .get_latest_commit(params.platform, TargetKind::Cli, params.quality)
1172 .await?
1173 }
1174 };
1175
1176 let cli = download_cli_into_cache(&paths.cli_cache, &release, &update_service).await?;
1177 let file = tokio::fs::File::open(cli)
1178 .await
1179 .map_err(|e| wrap(e, "error opening cli file"))?;
1180
1181 handle_spawn::<_, DuplexStream>(log, params.spawn, Some(file), None, None).await
1182}
1183
1184async fn handle_spawn<Stdin, StdoutAndErr>(
1185 log: &log::Logger,
1186 params: SpawnParams,
1187 stdin: Option<Stdin>,
1188 stdout: Option<StdoutAndErr>,
1189 stderr: Option<StdoutAndErr>,
1190) -> Result<SpawnResult, AnyError>
1191where
1192 Stdin: AsyncRead + Unpin + Send + 'static,
1193 StdoutAndErr: AsyncWrite + Unpin + Send + 'static,
1194{
1195 debug!(
1196 log,
1197 "requested to spawn {} with args {:?}", params.command, params.args
1198 );
1199
1200 macro_rules! pipe_if {
1201 ($e: expr) => {
1202 if $e {
1203 Stdio::piped()
1204 } else {
1205 Stdio::null()
1206 }
1207 };
1208 }
1209
1210 let mut p = new_tokio_command(¶ms.command);
1211 p.args(¶ms.args);
1212 p.envs(¶ms.env);
1213 p.stdin(pipe_if!(stdin.is_some()));
1214 p.stdout(pipe_if!(stdin.is_some()));
1215 p.stderr(pipe_if!(stderr.is_some()));
1216 if let Some(cwd) = ¶ms.cwd {
1217 p.current_dir(cwd);
1218 }
1219
1220 #[cfg(target_os = "windows")]
1221 p.creation_flags(winapi::um::winbase::CREATE_NO_WINDOW);
1222
1223 let mut p = p.spawn().map_err(CodeError::ProcessSpawnFailed)?;
1224
1225 let block_futs = FuturesUnordered::new();
1226 let poll_futs = FuturesUnordered::new();
1227 if let (Some(mut a), Some(mut b)) = (p.stdout.take(), stdout) {
1228 block_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
1229 }
1230 if let (Some(mut a), Some(mut b)) = (p.stderr.take(), stderr) {
1231 block_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
1232 }
1233 if let (Some(mut b), Some(mut a)) = (p.stdin.take(), stdin) {
1234 poll_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
1235 }
1236
1237 wait_for_process_exit(log, ¶ms.command, p, block_futs, poll_futs).await
1238}
1239
1240async fn handle_spawn_cli(
1241 log: &log::Logger,
1242 params: SpawnParams,
1243 mut protocol_in: DuplexStream,
1244 mut protocol_out: DuplexStream,
1245 mut log_out: DuplexStream,
1246) -> Result<SpawnResult, AnyError> {
1247 debug!(
1248 log,
1249 "requested to spawn cli {} with args {:?}", params.command, params.args
1250 );
1251
1252 let mut p = new_tokio_command(¶ms.command);
1253 p.args(¶ms.args);
1254
1255 p.arg("--verbose");
1257 p.arg("command-shell");
1258
1259 p.envs(¶ms.env);
1260 p.stdin(Stdio::piped());
1261 p.stdout(Stdio::piped());
1262 p.stderr(Stdio::piped());
1263 if let Some(cwd) = ¶ms.cwd {
1264 p.current_dir(cwd);
1265 }
1266
1267 let mut p = p.spawn().map_err(CodeError::ProcessSpawnFailed)?;
1268
1269 let mut stdin = p.stdin.take().unwrap();
1270 let mut stdout = p.stdout.take().unwrap();
1271 let mut stderr = p.stderr.take().unwrap();
1272
1273 let log_pump = tokio::spawn(async move { tokio::io::copy(&mut stdout, &mut log_out).await });
1275
1276 if let Err(e) = spawn_do_child_authentication(log, &mut stdin, &mut stderr).await {
1279 warning!(log, "failed to authenticate with child process {}", e);
1280 let _ = p.kill().await;
1281 return Err(e.into());
1282 }
1283
1284 debug!(log, "cli authenticated, attaching stdio");
1285 let block_futs = FuturesUnordered::new();
1286 let poll_futs = FuturesUnordered::new();
1287 poll_futs.push(async move { tokio::io::copy(&mut protocol_in, &mut stdin).await }.boxed());
1288 block_futs.push(async move { tokio::io::copy(&mut stderr, &mut protocol_out).await }.boxed());
1289 block_futs.push(async move { log_pump.await.unwrap() }.boxed());
1290
1291 wait_for_process_exit(log, ¶ms.command, p, block_futs, poll_futs).await
1292}
1293
1294type TokioCopyFuture = dyn futures::Future<Output = Result<u64, std::io::Error>> + Send;
1295
1296async fn get_joined_result(
1297 mut process: tokio::process::Child,
1298 block_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
1299) -> Result<std::process::ExitStatus, std::io::Error> {
1300 let (_, r) = tokio::join!(futures::future::join_all(block_futs), process.wait());
1301 r
1302}
1303
1304async fn wait_for_process_exit(
1308 log: &log::Logger,
1309 command: &str,
1310 process: tokio::process::Child,
1311 block_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
1312 poll_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
1313) -> Result<SpawnResult, AnyError> {
1314 let joined = get_joined_result(process, block_futs);
1315 pin!(joined);
1316
1317 let r = tokio::select! {
1318 _ = futures::future::join_all(poll_futs) => joined.await,
1319 r = &mut joined => r,
1320 };
1321
1322 let r = match r {
1323 Ok(e) => SpawnResult {
1324 message: e.to_string(),
1325 exit_code: e.code().unwrap_or(-1),
1326 },
1327 Err(e) => SpawnResult {
1328 message: e.to_string(),
1329 exit_code: -1,
1330 },
1331 };
1332
1333 debug!(
1334 log,
1335 "spawned cli {} exited with code {}", command, r.exit_code
1336 );
1337
1338 Ok(r)
1339}
1340
1341async fn spawn_do_child_authentication(
1342 log: &log::Logger,
1343 stdin: &mut ChildStdin,
1344 stdout: &mut ChildStderr,
1345) -> Result<(), CodeError> {
1346 let (msg_tx, msg_rx) = mpsc::unbounded_channel();
1347 let (shutdown_rx, shutdown) = new_barrier();
1348 let mut rpc = new_msgpack_rpc();
1349 let caller = rpc.get_caller(msg_tx);
1350
1351 let challenge_response = do_challenge_response_flow(caller, shutdown);
1352 let rpc = start_msgpack_rpc(
1353 rpc.methods(()).build(log.prefixed("client-auth")),
1354 stdout,
1355 stdin,
1356 msg_rx,
1357 shutdown_rx,
1358 );
1359 pin!(rpc);
1360
1361 tokio::select! {
1362 r = &mut rpc => {
1363 match r {
1364 Ok(_) => Ok(()),
1366 Err(e) => Err(CodeError::ProcessSpawnHandshakeFailed(e))
1367 }
1368 },
1369 r = challenge_response => {
1370 r?;
1371 rpc.await.map(|_| ()).map_err(CodeError::ProcessSpawnFailed)
1372 }
1373 }
1374}
1375
1376async fn do_challenge_response_flow(
1377 caller: RpcCaller<MsgPackSerializer>,
1378 shutdown: BarrierOpener<()>,
1379) -> Result<(), CodeError> {
1380 let challenge: ChallengeIssueResponse = caller
1381 .call(METHOD_CHALLENGE_ISSUE, EmptyObject {})
1382 .await
1383 .unwrap()
1384 .map_err(CodeError::TunnelRpcCallFailed)?;
1385
1386 let _: EmptyObject = caller
1387 .call(
1388 METHOD_CHALLENGE_VERIFY,
1389 ChallengeVerifyParams {
1390 response: sign_challenge(&challenge.challenge),
1391 },
1392 )
1393 .await
1394 .unwrap()
1395 .map_err(CodeError::TunnelRpcCallFailed)?;
1396
1397 shutdown.open(());
1398
1399 Ok(())
1400}