forest/cli/subcommands/
sync_cmd.rs1use crate::blocks::TipsetKey;
5use crate::chain_sync::{ForkSyncInfo, NodeSyncStatus, SyncStatusReport};
6use crate::rpc::sync::{SnapshotProgressState, SyncStatus};
7use crate::rpc::{self, prelude::*};
8use anyhow::Context;
9use cid::Cid;
10use clap::Subcommand;
11use std::{
12 io::{Write, stdout},
13 time::Duration,
14};
15use tokio::time;
16use tokio::time::sleep;
17
18#[derive(Debug, Subcommand)]
19pub enum SyncCommands {
20 Wait {
22 #[arg(short)]
24 watch: bool,
25 },
26 Status,
28 CheckBad {
30 #[arg(short)]
31 cid: Cid,
33 },
34 MarkBad {
36 #[arg(short)]
38 cid: Cid,
39 },
40}
41
42impl SyncCommands {
43 pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
44 match self {
45 Self::Wait { watch } => {
46 let mut stdout = stdout();
47 let mut lines_printed_last_iteration = 0;
48
49 handle_initial_snapshot_check(&client).await?;
50
51 let mut interval = tokio::time::interval(Duration::from_secs(1));
52 loop {
53 interval.tick().await;
54 let report = SyncStatus::call(&client, ())
55 .await
56 .context("Failed to get sync status")?;
57
58 wait_for_node_to_start_syncing(&client).await?;
59
60 clear_previous_lines(&mut stdout, lines_printed_last_iteration)?;
61
62 lines_printed_last_iteration = print_sync_report_details(&report)
63 .context("Failed to print sync status report")?;
64
65 if !watch && report.status == NodeSyncStatus::Synced {
67 println!("\nSync complete!");
68 break;
69 }
70 }
71
72 Ok(())
73 }
74
75 Self::Status => {
76 let sync_status = client.call(SyncStatus::request(())?).await?;
77 if sync_status.status == NodeSyncStatus::Initializing {
78 if !check_snapshot_progress(&client, false)
80 .await?
81 .is_not_required()
82 {
83 println!("Please try again later, once the snapshot is downloaded...");
84 return Ok(());
85 };
86 }
87
88 _ = print_sync_report_details(&sync_status)
90 .context("Failed to print sync status report")?;
91
92 Ok(())
93 }
94 Self::CheckBad { cid } => {
95 let response = SyncCheckBad::call(&client, (cid,)).await?;
96 if response.is_empty() {
97 println!("Block \"{cid}\" is not marked as a bad block");
98 } else {
99 println!("{response}");
100 }
101 Ok(())
102 }
103 Self::MarkBad { cid } => {
104 SyncMarkBad::call(&client, (cid,)).await?;
105 println!("OK");
106 Ok(())
107 }
108 }
109 }
110}
111
112fn print_sync_report_details(report: &SyncStatusReport) -> anyhow::Result<usize> {
114 let mut lines_printed_count = 0;
115
116 println!(
117 "Status: {:?} ({} epochs behind)",
118 report.status, report.epochs_behind
119 );
120 lines_printed_count += 1;
121
122 let head_key_str = report
123 .current_head_key
124 .as_ref()
125 .map(tipset_key_to_string)
126 .unwrap_or_else(|| "[unknown]".to_string());
127 println!(
128 "Node Head: Epoch {} ({})",
129 report.current_head_epoch, head_key_str
130 );
131 lines_printed_count += 1;
132
133 println!("Network Head: Epoch {}", report.network_head_epoch);
134 lines_printed_count += 1;
135
136 println!("Last Update: {}", report.last_updated.to_rfc3339());
137 lines_printed_count += 1;
138
139 let active_forks = &report.active_forks;
141 if active_forks.is_empty() {
142 println!("Active Sync Tasks: None");
143 lines_printed_count += 1;
144 } else {
145 println!("Active Sync Tasks:");
146 lines_printed_count += 1;
147 let mut sorted_forks = active_forks.clone();
148 sorted_forks.sort_by_key(|f| std::cmp::Reverse(f.target_epoch));
149 for fork in &sorted_forks {
150 lines_printed_count += print_fork_sync_info(fork)?;
154 }
155 }
156
157 Ok(lines_printed_count)
158}
159
160fn print_fork_sync_info(fork: &ForkSyncInfo) -> anyhow::Result<usize> {
162 let total_epochs_for_this_fork = fork
163 .target_epoch
164 .saturating_sub(fork.target_sync_epoch_start);
165 println!(
166 " - Fork Target: {} ({}), Stage: {}, Syncing Range: [{}..{}] ({} epochs)",
167 fork.target_epoch,
168 tipset_key_to_string(&fork.target_tipset_key),
169 &fork.stage,
170 fork.target_sync_epoch_start,
171 fork.target_epoch,
172 total_epochs_for_this_fork
173 );
174 Ok(1)
175}
176
177fn clear_previous_lines(stdout: &mut std::io::Stdout, lines: usize) -> anyhow::Result<()> {
178 if lines > 0 {
179 write!(
181 stdout,
182 "\r{}{}",
183 anes::MoveCursorUp(lines as u16),
184 anes::ClearBuffer::Below,
185 )?;
186 }
187 Ok(())
188}
189
190fn tipset_key_to_string(key: &TipsetKey) -> String {
191 let cids = key.to_cids();
192 match cids.len() {
193 0 => "[]".to_string(),
194 _ => format!("[{}, ...]", cids.first()),
195 }
196}
197
198async fn check_snapshot_progress(
201 client: &rpc::Client,
202 wait: bool,
203) -> anyhow::Result<SnapshotProgressState> {
204 let mut interval = time::interval(Duration::from_secs(5));
205 let mut stdout = stdout();
206 loop {
207 interval.tick().await;
208
209 let progress_state = client.call(SyncSnapshotProgress::request(())?).await?;
210
211 write!(
212 stdout,
213 "\r{}{}Snapshot status: {}\n",
214 anes::MoveCursorUp(1),
215 anes::ClearLine::All,
216 progress_state
217 )?;
218 stdout.flush()?;
219
220 match progress_state {
221 SnapshotProgressState::Completed | SnapshotProgressState::NotRequired => {
222 println!();
223 return Ok(progress_state);
224 }
225 _ if !wait => {
226 return Ok(progress_state);
227 }
228 _ => {} }
230 }
231}
232
233async fn wait_for_node_to_start_syncing(client: &rpc::Client) -> anyhow::Result<()> {
235 let mut is_msg_printed = false;
236 let mut stdout = stdout();
237 const POLLING_INTERVAL: Duration = Duration::from_secs(1);
238
239 loop {
240 let report = SyncStatus::call(client, ())
241 .await
242 .context("Failed to get sync status while waiting for initialization to complete")?;
243
244 if report.status == NodeSyncStatus::Initializing {
245 write!(stdout, "\rš Node syncing is initializing, please wait...")?;
246 stdout.flush()?;
247 is_msg_printed = true;
248
249 sleep(POLLING_INTERVAL).await;
250 } else {
251 if is_msg_printed {
252 clear_previous_lines(&mut stdout, 1)
253 .context("Failed to clear initializing message")?;
254 }
255
256 break;
257 }
258 }
259
260 Ok(())
261}
262
263async fn handle_initial_snapshot_check(client: &rpc::Client) -> anyhow::Result<()> {
266 let initial_report = SyncStatus::call(client, ())
267 .await
268 .context("Failed to get sync status")?;
269 if initial_report.status == NodeSyncStatus::Initializing {
270 if !check_snapshot_progress(client, false)
273 .await?
274 .is_not_required()
275 {
276 check_snapshot_progress(client, true).await?;
277 }
278 }
279
280 Ok(())
281}