cli/tunnels/
singleton_client.rs1use std::{
7 path::Path,
8 sync::{
9 atomic::{AtomicBool, Ordering},
10 Arc,
11 },
12 thread,
13};
14
15use const_format::concatcp;
16use tokio::sync::mpsc;
17
18use crate::{
19 async_pipe::{socket_stream_split, AsyncPipe},
20 constants::IS_INTERACTIVE_CLI,
21 json_rpc::{new_json_rpc, start_json_rpc, JsonRpcSerializer},
22 log,
23 rpc::RpcCaller,
24 singleton::connect_as_client,
25 tunnels::{code_server::print_listening, protocol::EmptyObject},
26 util::{errors::CodeError, sync::Barrier},
27};
28
29use super::{
30 protocol,
31 shutdown_signal::{ShutdownRequest, ShutdownSignal},
32};
33
34pub struct SingletonClientArgs {
35 pub log: log::Logger,
36 pub stream: AsyncPipe,
37 pub shutdown: Barrier<ShutdownSignal>,
38}
39
40struct SingletonServerContext {
41 log: log::Logger,
42 exit_entirely: Arc<AtomicBool>,
43 caller: RpcCaller<JsonRpcSerializer>,
44}
45
46const CONTROL_INSTRUCTIONS_COMMON: &str =
47 "Connected to an existing tunnel process running on this machine.";
48
49const CONTROL_INSTRUCTIONS_INTERACTIVE: &str = concatcp!(
50 CONTROL_INSTRUCTIONS_COMMON,
51 " You can press:
52
53- \"x\" + Enter to stop the tunnel and exit
54- \"r\" + Enter to restart the tunnel
55- Ctrl+C to detach
56"
57);
58
59pub async fn start_singleton_client(args: SingletonClientArgs) -> bool {
62 let mut rpc = new_json_rpc();
63 let (msg_tx, msg_rx) = mpsc::unbounded_channel();
64 let exit_entirely = Arc::new(AtomicBool::new(false));
65
66 debug!(
67 args.log,
68 "An existing tunnel is running on this machine, connecting to it..."
69 );
70
71 if *IS_INTERACTIVE_CLI {
72 let stdin_handle = rpc.get_caller(msg_tx.clone());
73 thread::spawn(move || {
74 let mut input = String::new();
75 loop {
76 input.truncate(0);
77 match std::io::stdin().read_line(&mut input) {
78 Err(_) | Ok(0) => return, _ => {}
80 };
81
82 match input.chars().next().map(|c| c.to_ascii_lowercase()) {
83 Some('x') => {
84 stdin_handle.notify(protocol::singleton::METHOD_SHUTDOWN, EmptyObject {});
85 return;
86 }
87 Some('r') => {
88 stdin_handle.notify(protocol::singleton::METHOD_RESTART, EmptyObject {});
89 }
90 Some(_) | None => {}
91 }
92 }
93 });
94 }
95
96 let caller = rpc.get_caller(msg_tx);
97 let mut rpc = rpc.methods(SingletonServerContext {
98 log: args.log.clone(),
99 exit_entirely: exit_entirely.clone(),
100 caller,
101 });
102
103 rpc.register_sync(protocol::singleton::METHOD_SHUTDOWN, |_: EmptyObject, c| {
104 c.exit_entirely.store(true, Ordering::SeqCst);
105 Ok(())
106 });
107
108 rpc.register_async(
109 protocol::singleton::METHOD_LOG_REPLY_DONE,
110 |_: EmptyObject, c| async move {
111 c.log.result(if *IS_INTERACTIVE_CLI {
112 CONTROL_INSTRUCTIONS_INTERACTIVE
113 } else {
114 CONTROL_INSTRUCTIONS_COMMON
115 });
116
117 let res = c
118 .caller
119 .call::<_, _, protocol::singleton::StatusWithTunnelName>(
120 protocol::singleton::METHOD_STATUS,
121 protocol::EmptyObject {},
122 );
123
124 if let Ok(Ok(s)) = res.await {
128 if let Some(name) = s.name {
129 print_listening(&c.log, &name);
130 }
131 }
132
133 Ok(())
134 },
135 );
136
137 rpc.register_sync(
138 protocol::singleton::METHOD_LOG,
139 |log: protocol::singleton::LogMessageOwned, c| {
140 match log.level {
141 Some(level) => c.log.emit(level, &format!("{}{}", log.prefix, log.message)),
142 None => c.log.result(format!("{}{}", log.prefix, log.message)),
143 }
144 Ok(())
145 },
146 );
147
148 let (read, write) = socket_stream_split(args.stream);
149 let _ = start_json_rpc(rpc.build(args.log), read, write, msg_rx, args.shutdown).await;
150
151 exit_entirely.load(Ordering::SeqCst)
152}
153
154pub async fn do_single_rpc_call<
155 P: serde::Serialize + 'static,
156 R: serde::de::DeserializeOwned + Send + 'static,
157>(
158 lock_file: &Path,
159 log: log::Logger,
160 method: &'static str,
161 params: P,
162) -> Result<R, CodeError> {
163 let client = match connect_as_client(lock_file).await {
164 Ok(p) => p,
165 Err(CodeError::SingletonLockfileOpenFailed(_))
166 | Err(CodeError::SingletonLockedProcessExited(_)) => {
167 return Err(CodeError::NoRunningTunnel);
168 }
169 Err(e) => return Err(e),
170 };
171
172 let (msg_tx, msg_rx) = mpsc::unbounded_channel();
173 let mut rpc = new_json_rpc();
174 let caller = rpc.get_caller(msg_tx);
175 let (read, write) = socket_stream_split(client);
176
177 let rpc = tokio::spawn(async move {
178 start_json_rpc(
179 rpc.methods(()).build(log),
180 read,
181 write,
182 msg_rx,
183 ShutdownRequest::create_rx([ShutdownRequest::CtrlC]),
184 )
185 .await
186 .unwrap();
187 });
188
189 let r = caller.call(method, params).await.unwrap();
190 rpc.abort();
191 r.map_err(CodeError::TunnelRpcCallFailed)
192}