1use async_trait::async_trait;
7use base64::{engine::general_purpose as b64, Engine as _};
8use futures::{stream::FuturesUnordered, StreamExt};
9use serde::Serialize;
10use sha2::{Digest, Sha256};
11use std::{
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 str::FromStr,
14 time::Duration,
15};
16use sysinfo::Pid;
17use tokio::{
18 io::{AsyncBufReadExt, BufReader},
19 sync::watch,
20};
21
22use super::{
23 args::{
24 AuthProvider, CliCore, CommandShellArgs, ExistingTunnelArgs, TunnelForwardArgs,
25 TunnelRenameArgs, TunnelServeArgs, TunnelServiceSubCommands, TunnelUserSubCommands,
26 },
27 CommandContext,
28};
29
30use crate::{
31 async_pipe::{get_socket_name, listen_socket_rw_stream, AsyncRWAccepter},
32 auth::Auth,
33 constants::{
34 APPLICATION_NAME, CONTROL_PORT, IS_A_TTY, TUNNEL_CLI_LOCK_NAME, TUNNEL_SERVICE_LOCK_NAME,
35 },
36 log,
37 state::LauncherPaths,
38 tunnels::{
39 code_server::CodeServerArgs,
40 create_service_manager,
41 dev_tunnels::{self, DevTunnels},
42 legal, local_forwarding,
43 paths::get_all_servers,
44 protocol, serve_stream,
45 shutdown_signal::ShutdownRequest,
46 singleton_client::do_single_rpc_call,
47 singleton_server::{
48 make_singleton_server, start_singleton_server, BroadcastLogSink, SingletonServerArgs,
49 },
50 AuthRequired, Next, ServeStreamParams, ServiceContainer, ServiceManager,
51 },
52 util::{
53 app_lock::AppMutex,
54 command::new_std_command,
55 errors::{wrap, AnyError, CodeError},
56 machine::canonical_exe,
57 prereqs::PreReqChecker,
58 },
59};
60use crate::{
61 singleton::{acquire_singleton, SingletonConnection},
62 tunnels::{
63 dev_tunnels::ActiveTunnel,
64 singleton_client::{start_singleton_client, SingletonClientArgs},
65 SleepInhibitor,
66 },
67};
68
69impl From<AuthProvider> for crate::auth::AuthProvider {
70 fn from(auth_provider: AuthProvider) -> Self {
71 match auth_provider {
72 AuthProvider::Github => crate::auth::AuthProvider::Github,
73 AuthProvider::Microsoft => crate::auth::AuthProvider::Microsoft,
74 }
75 }
76}
77
78fn fulfill_existing_tunnel_args(
79 d: ExistingTunnelArgs,
80 name_arg: &Option<String>,
81) -> Option<dev_tunnels::ExistingTunnel> {
82 let tunnel_name = d.tunnel_name.or_else(|| name_arg.clone());
83
84 match (d.tunnel_id, d.cluster, d.host_token) {
85 (Some(tunnel_id), None, Some(host_token)) => {
86 let i = tunnel_id.find('.')?;
87 Some(dev_tunnels::ExistingTunnel {
88 tunnel_id: tunnel_id[..i].to_string(),
89 cluster: tunnel_id[i + 1..].to_string(),
90 tunnel_name,
91 host_token,
92 })
93 }
94
95 (Some(tunnel_id), Some(cluster), Some(host_token)) => Some(dev_tunnels::ExistingTunnel {
96 tunnel_id,
97 tunnel_name,
98 host_token,
99 cluster,
100 }),
101
102 _ => None,
103 }
104}
105
106struct TunnelServiceContainer {
107 args: CliCore,
108}
109
110impl TunnelServiceContainer {
111 fn new(args: CliCore) -> Self {
112 Self { args }
113 }
114}
115
116#[async_trait]
117impl ServiceContainer for TunnelServiceContainer {
118 async fn run_service(
119 &mut self,
120 log: log::Logger,
121 launcher_paths: LauncherPaths,
122 ) -> Result<(), AnyError> {
123 let csa = (&self.args).into();
124 serve_with_csa(
125 launcher_paths,
126 log,
127 TunnelServeArgs {
128 random_name: true, ..Default::default()
130 },
131 csa,
132 TUNNEL_SERVICE_LOCK_NAME,
133 )
134 .await?;
135 Ok(())
136 }
137}
138
139pub async fn command_shell(ctx: CommandContext, args: CommandShellArgs) -> Result<i32, AnyError> {
140 let platform = PreReqChecker::new().verify().await?;
141 let mut shutdown_reqs = vec![ShutdownRequest::CtrlC];
142 if let Some(p) = args.parent_process_id.and_then(|p| Pid::from_str(&p).ok()) {
143 shutdown_reqs.push(ShutdownRequest::ParentProcessKilled(p));
144 }
145
146 let mut params = ServeStreamParams {
147 log: ctx.log,
148 launcher_paths: ctx.paths,
149 platform,
150 requires_auth: args
151 .require_token
152 .map(AuthRequired::VSDAWithToken)
153 .unwrap_or(AuthRequired::VSDA),
154 exit_barrier: ShutdownRequest::create_rx(shutdown_reqs),
155 code_server_args: (&ctx.args).into(),
156 };
157
158 let mut listener: Box<dyn AsyncRWAccepter> = match (args.on_port, args.on_socket) {
159 (_, true) => {
160 let socket = get_socket_name();
161 let listener = listen_socket_rw_stream(&socket)
162 .await
163 .map_err(|e| wrap(e, "error listening on socket"))?;
164
165 params
166 .log
167 .result(format!("Listening on {}", socket.display()));
168
169 Box::new(listener)
170 }
171 (Some(p), _) => {
172 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), p);
173 let listener = tokio::net::TcpListener::bind(addr)
174 .await
175 .map_err(|e| wrap(e, "error listening on port"))?;
176
177 params
178 .log
179 .result(format!("Listening on {}", listener.local_addr().unwrap()));
180
181 Box::new(listener)
182 }
183 _ => {
184 serve_stream(tokio::io::stdin(), tokio::io::stderr(), params).await;
185 return Ok(0);
186 }
187 };
188
189 let mut servers = FuturesUnordered::new();
190
191 loop {
192 tokio::select! {
193 Some(_) = servers.next() => {},
194 socket = listener.accept_rw() => {
195 match socket {
196 Ok((read, write)) => servers.push(serve_stream(read, write, params.clone())),
197 Err(e) => {
198 error!(params.log, &format!("Error accepting connection: {}", e));
199 return Ok(1);
200 }
201 }
202 },
203 _ = params.exit_barrier.wait() => {
204 while (servers.next().await).is_some() { }
206 return Ok(0);
207 }
208 }
209 }
210}
211
212pub async fn service(
213 ctx: CommandContext,
214 service_args: TunnelServiceSubCommands,
215) -> Result<i32, AnyError> {
216 let manager = create_service_manager(ctx.log.clone(), &ctx.paths);
217 match service_args {
218 TunnelServiceSubCommands::Install(args) => {
219 let auth = Auth::new(&ctx.paths, ctx.log.clone());
220
221 if let Some(name) = &args.name {
222 dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths)
224 .rename_tunnel(name)
225 .await?;
226 } else {
227 auth.get_credential().await?;
229 }
230
231 legal::require_consent(&ctx.paths, args.accept_server_license_terms)?;
233
234 let current_exe = canonical_exe().map_err(|e| wrap(e, "could not get current exe"))?;
235
236 manager
237 .register(
238 current_exe,
239 &[
240 "--verbose",
241 "--cli-data-dir",
242 ctx.paths.root().as_os_str().to_string_lossy().as_ref(),
243 "tunnel",
244 "service",
245 "internal-run",
246 ],
247 )
248 .await?;
249 ctx.log.result(format!("Service successfully installed! You can use `{} tunnel service log` to monitor it, and `{} tunnel service uninstall` to remove it.", APPLICATION_NAME, APPLICATION_NAME));
250 }
251 TunnelServiceSubCommands::Uninstall => {
252 manager.unregister().await?;
253 }
254 TunnelServiceSubCommands::Log => {
255 manager.show_logs().await?;
256 }
257 TunnelServiceSubCommands::InternalRun => {
258 manager
259 .run(ctx.paths.clone(), TunnelServiceContainer::new(ctx.args))
260 .await?;
261 }
262 }
263
264 Ok(0)
265}
266
267pub async fn user(ctx: CommandContext, user_args: TunnelUserSubCommands) -> Result<i32, AnyError> {
268 let auth = Auth::new(&ctx.paths, ctx.log.clone());
269 match user_args {
270 TunnelUserSubCommands::Login(login_args) => {
271 auth.login(
272 login_args.provider.map(|p| p.into()),
273 login_args.access_token.to_owned(),
274 )
275 .await?;
276 }
277 TunnelUserSubCommands::Logout => {
278 auth.clear_credentials()?;
279 }
280 TunnelUserSubCommands::Show => {
281 if let Ok(Some(_)) = auth.get_current_credential() {
282 ctx.log.result("logged in");
283 } else {
284 ctx.log.result("not logged in");
285 return Ok(1);
286 }
287 }
288 }
289
290 Ok(0)
291}
292
293pub async fn rename(ctx: CommandContext, rename_args: TunnelRenameArgs) -> Result<i32, AnyError> {
295 let auth = Auth::new(&ctx.paths, ctx.log.clone());
296 let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths);
297 dt.rename_tunnel(&rename_args.name).await?;
298 ctx.log.result(format!(
299 "Successfully renamed this tunnel to {}",
300 &rename_args.name
301 ));
302
303 Ok(0)
304}
305
306pub async fn unregister(ctx: CommandContext) -> Result<i32, AnyError> {
308 let auth = Auth::new(&ctx.paths, ctx.log.clone());
309 let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths);
310 dt.remove_tunnel().await?;
311 Ok(0)
312}
313
314pub async fn restart(ctx: CommandContext) -> Result<i32, AnyError> {
315 do_single_rpc_call::<_, ()>(
316 &ctx.paths.tunnel_lockfile(),
317 ctx.log,
318 protocol::singleton::METHOD_RESTART,
319 protocol::EmptyObject {},
320 )
321 .await
322 .map(|_| 0)
323 .map_err(|e| e.into())
324}
325
326pub async fn kill(ctx: CommandContext) -> Result<i32, AnyError> {
327 do_single_rpc_call::<_, ()>(
328 &ctx.paths.tunnel_lockfile(),
329 ctx.log,
330 protocol::singleton::METHOD_SHUTDOWN,
331 protocol::EmptyObject {},
332 )
333 .await
334 .map(|_| 0)
335 .map_err(|e| e.into())
336}
337
338#[derive(Serialize)]
339pub struct StatusOutput {
340 pub tunnel: Option<protocol::singleton::StatusWithTunnelName>,
341 pub service_installed: bool,
342}
343
344pub async fn status(ctx: CommandContext) -> Result<i32, AnyError> {
345 let tunnel = do_single_rpc_call::<_, protocol::singleton::StatusWithTunnelName>(
346 &ctx.paths.tunnel_lockfile(),
347 ctx.log.clone(),
348 protocol::singleton::METHOD_STATUS,
349 protocol::EmptyObject {},
350 )
351 .await;
352
353 let service_installed = create_service_manager(ctx.log.clone(), &ctx.paths)
354 .is_installed()
355 .await
356 .unwrap_or(false);
357
358 ctx.log.result(
359 serde_json::to_string(&StatusOutput {
360 service_installed,
361 tunnel: match tunnel {
362 Ok(s) => Some(s),
363 Err(CodeError::NoRunningTunnel | CodeError::AsyncPipeFailed(_)) => None,
364 Err(e) => return Err(e.into()),
365 },
366 })
367 .unwrap(),
368 );
369
370 Ok(0)
371}
372
373pub async fn prune(ctx: CommandContext) -> Result<i32, AnyError> {
375 get_all_servers(&ctx.paths)
376 .into_iter()
377 .map(|s| s.server_paths(&ctx.paths))
378 .filter(|s| s.get_running_pid().is_none())
379 .try_for_each(|s| {
380 ctx.log
381 .result(format!("Deleted {}", s.server_dir.display()));
382 s.delete()
383 })
384 .map_err(AnyError::from)?;
385
386 ctx.log.result("Successfully removed all unused servers");
387
388 Ok(0)
389}
390
391pub async fn serve(ctx: CommandContext, gateway_args: TunnelServeArgs) -> Result<i32, AnyError> {
393 let CommandContext {
394 log, paths, args, ..
395 } = ctx;
396
397 let no_sleep = match gateway_args.no_sleep.then(SleepInhibitor::new) {
398 Some(i) => match i.await {
399 Ok(i) => Some(i),
400 Err(e) => {
401 warning!(log, "Could not inhibit sleep: {}", e);
402 None
403 }
404 },
405 None => None,
406 };
407
408 legal::require_consent(&paths, gateway_args.accept_server_license_terms)?;
409
410 let mut csa = (&args).into();
411 gateway_args.apply_to_server_args(&mut csa);
412 let result = serve_with_csa(paths, log, gateway_args, csa, TUNNEL_CLI_LOCK_NAME).await;
413 drop(no_sleep);
414
415 result
416}
417
418pub async fn forward(
425 ctx: CommandContext,
426 mut forward_args: TunnelForwardArgs,
427) -> Result<i32, AnyError> {
428 if *IS_A_TTY {
431 trace!(ctx.log, "port forwarding is an internal preview feature");
432 }
433
434 let (own_ports_tx, own_ports_rx) = watch::channel(vec![]);
436 let ports_process_log = ctx.log.clone();
437 tokio::spawn(async move {
438 let mut lines = BufReader::new(tokio::io::stdin()).lines();
439 while let Ok(Some(line)) = lines.next_line().await {
440 match serde_json::from_str(&line) {
441 Ok(p) => {
442 let _ = own_ports_tx.send(p);
443 }
444 Err(e) => warning!(ports_process_log, "error parsing ports: {}", e),
445 }
446 }
447 });
448
449 let shutdown = ShutdownRequest::create_rx([ShutdownRequest::CtrlC]);
451 let server = loop {
452 if shutdown.is_open() {
453 return Ok(0);
454 }
455
456 match acquire_singleton(&ctx.paths.forwarding_lockfile()).await {
457 Ok(SingletonConnection::Client(stream)) => {
458 debug!(ctx.log, "starting as client to singleton");
459 let r = local_forwarding::client(local_forwarding::SingletonClientArgs {
460 log: ctx.log.clone(),
461 shutdown: shutdown.clone(),
462 stream,
463 port_requests: own_ports_rx.clone(),
464 })
465 .await;
466 if let Err(e) = r {
467 warning!(ctx.log, "error contacting forwarding singleton: {}", e);
468 }
469 }
470 Ok(SingletonConnection::Singleton(server)) => break server,
471 Err(e) => {
472 warning!(ctx.log, "error access singleton, retrying: {}", e);
473 tokio::time::sleep(Duration::from_secs(2)).await
474 }
475 }
476 };
477
478 let auth = Auth::new(&ctx.paths, ctx.log.clone());
480 if let (Some(p), Some(at)) = (
481 forward_args.login.provider.take(),
482 forward_args.login.access_token.take(),
483 ) {
484 auth.login(Some(p.into()), Some(at)).await?;
485 }
486
487 let mut tunnels = DevTunnels::new_port_forwarding(&ctx.log, auth, &ctx.paths);
488 let tunnel = tunnels
489 .start_new_launcher_tunnel(None, true, &forward_args.ports)
490 .await?;
491
492 local_forwarding::server(ctx.log, tunnel, server, own_ports_rx, shutdown).await?;
493
494 Ok(0)
495}
496
497fn get_connection_token(tunnel: &ActiveTunnel) -> String {
498 let mut hash = Sha256::new();
499 hash.update(tunnel.id.as_bytes());
500 let result = hash.finalize();
501 b64::URL_SAFE_NO_PAD.encode(result)
502}
503
504async fn serve_with_csa(
505 paths: LauncherPaths,
506 mut log: log::Logger,
507 gateway_args: TunnelServeArgs,
508 mut csa: CodeServerArgs,
509 app_mutex_name: Option<&'static str>,
510) -> Result<i32, AnyError> {
511 let log_broadcast = BroadcastLogSink::new();
512 log = log.tee(log_broadcast.clone());
513 log::install_global_logger(log.clone()); debug!(
516 log,
517 "Starting tunnel with `{} {}`",
518 APPLICATION_NAME,
519 std::env::args().collect::<Vec<_>>().join(" ")
520 );
521
522 let current_exe = std::env::current_exe().unwrap();
526
527 let mut vec = vec![
528 ShutdownRequest::CtrlC,
529 ShutdownRequest::ExeUninstalled(current_exe.to_owned()),
530 ];
531 if let Some(p) = gateway_args
532 .parent_process_id
533 .and_then(|p| Pid::from_str(&p).ok())
534 {
535 vec.push(ShutdownRequest::ParentProcessKilled(p));
536 }
537 let mut shutdown = ShutdownRequest::create_rx(vec);
538
539 let server = loop {
540 if shutdown.is_open() {
541 return Ok(0);
542 }
543
544 match acquire_singleton(&paths.tunnel_lockfile()).await {
545 Ok(SingletonConnection::Client(stream)) => {
546 debug!(log, "starting as client to singleton");
547 let should_exit = start_singleton_client(SingletonClientArgs {
548 log: log.clone(),
549 shutdown: shutdown.clone(),
550 stream,
551 })
552 .await;
553 if should_exit {
554 return Ok(0);
555 }
556 }
557 Ok(SingletonConnection::Singleton(server)) => break server,
558 Err(e) => {
559 warning!(log, "error access singleton, retrying: {}", e);
560 tokio::time::sleep(Duration::from_secs(2)).await
561 }
562 }
563 };
564
565 debug!(log, "starting as new singleton");
566
567 let mut server =
568 make_singleton_server(log_broadcast.clone(), log.clone(), server, shutdown.clone());
569 let platform = spanf!(log, log.span("prereq"), PreReqChecker::new().verify())?;
570 let _lock = app_mutex_name.map(AppMutex::new);
571
572 let auth = Auth::new(&paths, log.clone());
573 let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&log, auth, &paths);
574 loop {
575 let tunnel = if let Some(t) =
576 fulfill_existing_tunnel_args(gateway_args.tunnel.clone(), &gateway_args.name)
577 {
578 dt.start_existing_tunnel(t).await
579 } else {
580 tokio::select! {
581 t = dt.start_new_launcher_tunnel(gateway_args.name.as_deref(), gateway_args.random_name, &[CONTROL_PORT]) => t,
582 _ = shutdown.wait() => return Ok(1),
583 }
584 }?;
585
586 csa.connection_token = Some(get_connection_token(&tunnel));
587
588 let mut r = start_singleton_server(SingletonServerArgs {
589 log: log.clone(),
590 tunnel,
591 paths: &paths,
592 code_server_args: &csa,
593 platform,
594 log_broadcast: &log_broadcast,
595 shutdown: shutdown.clone(),
596 server: &mut server,
597 })
598 .await?;
599 r.tunnel.close().await.ok();
600
601 match r.next {
602 Next::Respawn => {
603 warning!(log, "respawn requested, starting new server");
604 let args = std::env::args().skip(1).collect::<Vec<String>>();
607 let exit = new_std_command(current_exe)
608 .args(args)
609 .spawn()
610 .map_err(|e| wrap(e, "error respawning after update"))?
611 .wait()
612 .map_err(|e| wrap(e, "error waiting for child"))?;
613
614 return Ok(exit.code().unwrap_or(1));
615 }
616 Next::Exit => {
617 debug!(log, "Tunnel shut down");
618 return Ok(0);
619 }
620 Next::Restart => continue,
621 }
622 }
623}