radicle_cli/commands/node/
control.rs1use std::collections::HashMap;
2use std::ffi::OsString;
3use std::fs::File;
4use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
5use std::{path::Path, process, thread, time};
6
7use anyhow::{anyhow, Context};
8use localtime::LocalTime;
9
10use radicle::node;
11use radicle::node::{Address, ConnectResult, Handle as _, NodeId};
12use radicle::profile::env::RAD_PASSPHRASE;
13use radicle::Node;
14use radicle::{profile, Profile};
15
16use crate::commands::node::logs::{LogRotatorFileSystem, Rotated};
17use crate::terminal as term;
18use crate::terminal::Element as _;
19
20pub const NODE_START_TIMEOUT: time::Duration = time::Duration::from_secs(6);
22
23pub fn start(
24 node: Node,
25 daemon: bool,
26 verbose: bool,
27 mut options: Vec<OsString>,
28 cmd: &Path,
29 profile: &Profile,
30) -> anyhow::Result<()> {
31 if node.is_running() {
32 term::success!("Node is already running.");
33 return Ok(());
34 }
35 let envs = if profile.keystore.is_encrypted()? {
36 let validator = term::io::PassphraseValidator::new(profile.keystore.clone());
39 let passphrase = if let Some(phrase) = profile::env::passphrase() {
40 phrase
41 } else if let Some(phrase) = term::io::passphrase(validator)? {
42 phrase
43 } else {
44 anyhow::bail!(
45 "A passphrase is required to read your Radicle key in order to start the node. Unable to continue. Consider setting the environment variable `{RAD_PASSPHRASE}`."
46 );
47 };
48 Some((profile::env::RAD_PASSPHRASE, passphrase))
49 } else {
50 None
51 };
52
53 if !options.contains(&OsString::from("--force")) {
56 options.push(OsString::from("--force"));
57 }
58
59 let Rotated {
60 path: log_path,
61 log: log_file,
62 } = LogRotatorFileSystem::from_profile(profile).rotate()?;
63
64 if daemon {
65 let child = process::Command::new(cmd)
66 .args(options)
67 .envs(envs)
68 .stdin(process::Stdio::null())
69 .stdout(process::Stdio::from(log_file.try_clone()?))
70 .stderr(process::Stdio::from(log_file))
71 .spawn()
72 .map_err(|e| anyhow!("failed to start node process {cmd:?}: {e}"))?;
73 let pid = term::format::parens(term::format::dim(child.id()));
74
75 if verbose {
76 logs(0, Some(time::Duration::from_secs(1)), profile)?;
77 } else {
78 let started = time::Instant::now();
79 let mut spinner = term::spinner(format!("Node starting.. {pid}"));
80
81 loop {
82 if node.is_running() {
83 spinner.message(format!("Node started {pid}"));
84 spinner.finish();
85
86 term::print(term::format::dim(
87 "To stay in sync with the network, leave the node running in the background.",
88 ));
89 term::info!(
90 "{} {}{}",
91 term::format::dim("To learn more, run"),
92 term::format::command("rad node --help"),
93 term::format::dim("."),
94 );
95 break;
96 } else if started.elapsed() >= NODE_START_TIMEOUT {
97 anyhow::bail!(
98 "node failed to start. Try running it with `rad node start --foreground`, \
99 or check the logs with `rad node logs`"
100 );
101 }
102 thread::sleep(time::Duration::from_millis(60));
103 }
104 }
105 } else {
106 let mut log_file = log_file;
108 let _ = log_file.write_all(format!("radicle-node started in foreground, no futher log messages are written to '{}' (this file).\n", log_path.display()).as_bytes());
109
110 let mut child = process::Command::new(cmd)
111 .args(options)
112 .envs(envs)
113 .spawn()
114 .map_err(|e| anyhow!("failed to start node process {cmd:?}: {e}"))?;
115
116 child.wait()?;
117 }
118
119 Ok(())
120}
121
122pub fn stop(node: Node, profile: &Profile) {
123 let mut spinner = term::spinner("Stopping node...");
124 if node.shutdown().is_err() {
125 spinner.error("node is not running");
126 } else {
127 spinner.message("Node stopped");
128 spinner.finish();
129 }
130 let rotator = LogRotatorFileSystem::from_profile(profile);
131 rotator.remove().ok();
132}
133
134pub fn debug(node: &mut Node) -> anyhow::Result<()> {
135 let json = node.debug()?;
136 term::json::to_pretty(&json, Path::new("debug.json"))?.print();
137
138 Ok(())
139}
140
141pub fn logs(lines: usize, follow: Option<time::Duration>, profile: &Profile) -> anyhow::Result<()> {
142 let logs_path = profile.home.node().join("node.log");
143 let mut file = File::open(logs_path.clone())
144 .map(BufReader::new)
145 .with_context(|| {
146 format!(
147 "Failed to read log file at '{}'. Did you start the node with `rad node start`? \
148 If the node was started through a process manager, check its logs instead.",
149 logs_path.display()
150 )
151 })?;
152
153 file.seek(SeekFrom::End(0))?;
154
155 let mut tail = Vec::new();
156 let mut nlines = 0;
157
158 for i in (1..=file.stream_position()?).rev() {
159 let mut buf = [0; 1];
160 file.seek(SeekFrom::Start(i - 1))?;
161 file.read_exact(&mut buf)?;
162
163 if buf[0] == b'\n' {
164 nlines += 1;
165 }
166 if nlines > lines {
167 break;
168 }
169 tail.push(buf[0]);
170 }
171 tail.reverse();
172
173 print!("{}", term::format::dim(String::from_utf8_lossy(&tail)));
174
175 if let Some(timeout) = follow {
176 file.seek(SeekFrom::End(0))?;
177
178 let start = time::Instant::now();
179
180 while start.elapsed() < timeout {
181 let mut line = String::new();
182 let len = file.read_line(&mut line)?;
183
184 if len == 0 {
185 thread::sleep(time::Duration::from_millis(250));
186 } else {
187 print!("{}", term::format::dim(line));
188 }
189 }
190 }
191 Ok(())
192}
193
194pub fn connect(
195 node: &mut Node,
196 nid: NodeId,
197 addr: Address,
198 timeout: time::Duration,
199) -> anyhow::Result<()> {
200 let spinner = term::spinner(format!(
201 "Connecting to {}@{addr}...",
202 term::format::node_id_human_compact(&nid)
203 ));
204 match node.connect(
205 nid,
206 addr,
207 node::ConnectOptions {
208 persistent: true,
209 timeout,
210 },
211 ) {
212 Ok(ConnectResult::Connected) => spinner.finish(),
213 Ok(ConnectResult::Disconnected { reason }) => spinner.error(reason),
214 Err(err) => return Err(err.into()),
215 }
216 Ok(())
217}
218
219pub fn connect_many(
220 node: &mut Node,
221 nid: NodeId,
222 addrs: Vec<Address>,
223 timeout: time::Duration,
224) -> anyhow::Result<()> {
225 let mut spinner = term::spinner("Connecting...");
226 let mut errors = HashMap::new();
227 for addr in addrs {
228 spinner.message(format!(
229 "Connecting to {}@{addr}...",
230 term::format::node_id_human_compact(&nid)
231 ));
232 match node.connect(
233 nid,
234 addr.clone(),
235 node::ConnectOptions {
236 persistent: true,
237 timeout,
238 },
239 ) {
240 Ok(ConnectResult::Connected) => {
241 spinner.finish();
242 return Ok(());
243 }
244 Ok(ConnectResult::Disconnected { reason }) => {
245 errors.insert(addr, reason);
246 }
247 Err(err) => {
248 errors.insert(addr, err.to_string());
249 }
250 }
251 }
252 spinner.failed();
253 for (addr, err) in errors {
254 term::error(format!("Failed to connect to {addr}: {err}"));
255 }
256 Ok(())
257}
258
259pub fn status(node: &Node, profile: &Profile) -> anyhow::Result<()> {
260 for warning in crate::warning::nodes_renamed(&profile.config) {
261 term::warning(warning);
262 }
263
264 if !node.is_running() {
265 term::info!("Node is {}.", term::format::negative("stopped"));
266 term::info!(
267 "To start it, run {}.",
268 term::format::command("rad node start")
269 );
270 return Ok(());
271 }
272
273 let listen = node
274 .listen_addrs()?
275 .into_iter()
276 .map(|addr| addr.to_string())
277 .collect::<Vec<_>>();
278
279 let nid = node.nid()?;
280 let nid = if &nid == profile.id() {
281 term::format::tertiary(term::format::node_id_human(&nid))
282 } else {
283 term::format::yellow(term::format::node_id_human(&nid)).bold()
284 };
285
286 if listen.is_empty() {
287 term::success!(
288 "Node is {} with Node ID {} and {} configured to listen for inbound connections.",
289 term::format::positive("running"),
290 nid,
291 term::Paint::new("not").italic()
292 );
293 } else {
294 term::success!(
295 "Node is {} with Node ID {} and listening for inbound connections on {}.",
296 term::format::positive("running"),
297 nid,
298 listen.join(", ")
299 );
300 }
301
302 let sessions = sessions(node)?;
303 if let Some(table) = sessions {
304 term::blank();
305 table.print();
306 }
307
308 if profile.hints() {
309 const COLUMN_WIDTH: usize = 12;
310 let status = format!(
311 "\n{:>4} … {}\n {} {}\n {} {}",
312 state_label().fg(radicle_term::Color::White),
313 term::format::dim("Status:"),
314 format_args!(
315 "{} {:width$}",
316 state_connected(),
317 term::format::dim("… connected"),
318 width = COLUMN_WIDTH,
319 ),
320 format_args!(
321 "{} {}",
322 state_disconnected(),
323 term::format::dim("… disconnected")
324 ),
325 format_args!(
326 "{} {:width$}",
327 state_attempted(),
328 term::format::dim("… attempted"),
329 width = COLUMN_WIDTH,
330 ),
331 format_args!("{} {}", state_initial(), term::format::dim("… initial")),
332 );
333 let link_direction = format!(
334 "\n{:>4} … {}\n {} {}",
335 link_direction_label(),
336 term::format::dim("Link Direction:"),
337 format_args!(
338 "{} {:width$}",
339 link_direction_inbound(),
340 term::format::dim("… inbound"),
341 width = COLUMN_WIDTH,
342 ),
343 format_args!(
344 "{} {}",
345 link_direction_outbound(),
346 term::format::dim("… outbound")
347 ),
348 );
349 term::hint(status + &link_direction);
350 }
351
352 if profile.home.node().join("node.log").exists() {
353 term::blank();
354 logs(10, None, profile)?;
357 }
358 Ok(())
359}
360
361pub fn sessions(node: &Node) -> Result<Option<term::Table<5, term::Label>>, node::Error> {
362 let sessions = node.sessions()?;
363 if sessions.is_empty() {
364 return Ok(None);
365 }
366 let mut table = term::Table::new(term::table::TableOptions::bordered());
367 let now = LocalTime::now();
368
369 table.header([
370 term::format::bold("Node ID").into(),
371 term::format::bold("Address").into(),
372 state_label().into(),
373 link_direction_label().bold().into(),
374 term::format::bold("Since").into(),
375 ]);
376 table.divider();
377
378 table.extend(sessions.into_iter().map(|sess| {
379 let nid = term::format::tertiary(term::format::node_id_human(&sess.nid)).into();
380 let (addr, state, time) = match sess.state {
381 node::State::Initial => (
382 term::Label::blank(),
383 term::Label::from(state_initial()),
384 term::Label::blank(),
385 ),
386 node::State::Attempted => (
387 term::format::addr_compact(&sess.addr).into(),
388 term::Label::from(state_attempted()),
389 term::Label::blank(),
390 ),
391 node::State::Connected { since, .. } => (
392 term::format::addr_compact(&sess.addr).into(),
393 term::Label::from(state_connected()),
394 term::format::dim(now - since).into(),
395 ),
396 node::State::Disconnected { since, .. } => (
397 term::format::addr_compact(&sess.addr).into(),
398 term::Label::from(state_disconnected()),
399 term::format::dim(now - since).into(),
400 ),
401 };
402 let direction = match sess.link {
403 node::Link::Inbound => term::Label::from(link_direction_inbound()),
404 node::Link::Outbound => term::Label::from(link_direction_outbound()),
405 };
406
407 [nid, addr, state, direction, time]
408 }));
409
410 Ok(Some(table))
411}
412
413pub fn config(node: &Node) -> anyhow::Result<()> {
414 let cfg = node.config()?;
415 let cfg = serde_json::to_string_pretty(&cfg)?;
416
417 println!("{cfg}");
418
419 Ok(())
420}
421
422fn state_label() -> term::Paint<String> {
423 term::Paint::from("?".to_string())
424}
425
426fn state_initial() -> term::Paint<String> {
427 term::format::dim("•".to_string())
428}
429
430fn state_attempted() -> term::Paint<String> {
431 term::PREFIX_WARNING.into()
432}
433
434fn state_connected() -> term::Paint<String> {
435 term::PREFIX_SUCCESS.into()
436}
437
438fn state_disconnected() -> term::Paint<String> {
439 term::PREFIX_ERROR.into()
440}
441
442fn link_direction_label() -> term::Paint<String> {
443 term::Paint::from("⤭".to_string())
444}
445
446fn link_direction_inbound() -> term::Paint<String> {
447 term::format::yellow("↘".to_string())
448}
449
450fn link_direction_outbound() -> term::Paint<String> {
451 term::format::dim("↗".to_string())
452}