forest/cli/subcommands/
sync_cmd.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::TipsetKey;
5use crate::chain_sync::{ForkSyncInfo, NodeSyncStatus, SyncStatusReport};
6use crate::rpc::sync::{SnapshotProgressState, SyncStatus};
7use crate::rpc::{self, prelude::*};
8use anyhow::Context;
9use cid::Cid;
10use clap::Subcommand;
11use std::{
12    io::{Write, stdout},
13    time::Duration,
14};
15use tokio::time;
16use tokio::time::sleep;
17
18#[derive(Debug, Subcommand)]
19pub enum SyncCommands {
20    /// Display continuous sync data until sync is complete
21    Wait {
22        /// Don't exit after node is synced
23        #[arg(short)]
24        watch: bool,
25    },
26    /// Check sync status
27    Status,
28    /// Check if a given block is marked bad, and for what reason
29    CheckBad {
30        #[arg(short)]
31        /// The block CID to check
32        cid: Cid,
33    },
34    /// Mark a given block as bad
35    MarkBad {
36        /// The block CID to mark as a bad block
37        #[arg(short)]
38        cid: Cid,
39    },
40}
41
42impl SyncCommands {
43    pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
44        match self {
45            Self::Wait { watch } => {
46                let mut stdout = stdout();
47                let mut lines_printed_last_iteration = 0;
48
49                handle_initial_snapshot_check(&client).await?;
50
51                let mut interval = tokio::time::interval(Duration::from_secs(1));
52                loop {
53                    interval.tick().await;
54                    let report = SyncStatus::call(&client, ())
55                        .await
56                        .context("Failed to get sync status")?;
57
58                    wait_for_node_to_start_syncing(&client).await?;
59
60                    clear_previous_lines(&mut stdout, lines_printed_last_iteration)?;
61
62                    lines_printed_last_iteration = print_sync_report_details(&report)
63                        .context("Failed to print sync status report")?;
64
65                    // Exit if synced and not in watch mode.
66                    if !watch && report.status == NodeSyncStatus::Synced {
67                        println!("\nSync complete!");
68                        break;
69                    }
70                }
71
72                Ok(())
73            }
74
75            Self::Status => {
76                let sync_status = client.call(SyncStatus::request(())?).await?;
77                if sync_status.status == NodeSyncStatus::Initializing {
78                    // If a snapshot is required and not yet complete, return here
79                    if !check_snapshot_progress(&client, false)
80                        .await?
81                        .is_not_required()
82                    {
83                        println!("Please try again later, once the snapshot is downloaded...");
84                        return Ok(());
85                    };
86                }
87
88                // Print the status report once, without line counting for clearing
89                _ = print_sync_report_details(&sync_status)
90                    .context("Failed to print sync status report")?;
91
92                Ok(())
93            }
94            Self::CheckBad { cid } => {
95                let response = SyncCheckBad::call(&client, (cid,)).await?;
96                if response.is_empty() {
97                    println!("Block \"{cid}\" is not marked as a bad block");
98                } else {
99                    println!("{response}");
100                }
101                Ok(())
102            }
103            Self::MarkBad { cid } => {
104                SyncMarkBad::call(&client, (cid,)).await?;
105                println!("OK");
106                Ok(())
107            }
108        }
109    }
110}
111
112/// Prints the sync status report details and returns the number of lines printed.
113fn print_sync_report_details(report: &SyncStatusReport) -> anyhow::Result<usize> {
114    let mut lines_printed_count = 0;
115
116    println!(
117        "Status: {:?} ({} epochs behind)",
118        report.status, report.epochs_behind
119    );
120    lines_printed_count += 1;
121
122    let head_key_str = report
123        .current_head_key
124        .as_ref()
125        .map(tipset_key_to_string)
126        .unwrap_or_else(|| "[unknown]".to_string());
127    println!(
128        "Node Head: Epoch {} ({})",
129        report.current_head_epoch, head_key_str
130    );
131    lines_printed_count += 1;
132
133    println!("Network Head: Epoch {}", report.network_head_epoch);
134    lines_printed_count += 1;
135
136    println!("Last Update: {}", report.last_updated.to_rfc3339());
137    lines_printed_count += 1;
138
139    // Print active sync tasks (forks)
140    let active_forks = &report.active_forks;
141    if active_forks.is_empty() {
142        println!("Active Sync Tasks: None");
143        lines_printed_count += 1;
144    } else {
145        println!("Active Sync Tasks:");
146        lines_printed_count += 1;
147        let mut sorted_forks = active_forks.clone();
148        sorted_forks.sort_by_key(|f| std::cmp::Reverse(f.target_epoch));
149        for fork in &sorted_forks {
150            // Assuming print_fork_sync_info exists and increments line_count internally if needed
151            // If print_fork_sync_info doesn't increment, adjust line_count here.
152            // For simplicity, assuming it behaves as needed or is adjusted elsewhere.
153            lines_printed_count += print_fork_sync_info(fork)?;
154        }
155    }
156
157    Ok(lines_printed_count)
158}
159
160/// Prints fork sync info and returns the number of lines printed (expected to be 1).
161fn print_fork_sync_info(fork: &ForkSyncInfo) -> anyhow::Result<usize> {
162    let total_epochs_for_this_fork = fork
163        .target_epoch
164        .saturating_sub(fork.target_sync_epoch_start);
165    println!(
166        "  - Fork Target: {} ({}), Stage: {}, Syncing Range: [{}..{}] ({} epochs)",
167        fork.target_epoch,
168        tipset_key_to_string(&fork.target_tipset_key),
169        &fork.stage,
170        fork.target_sync_epoch_start,
171        fork.target_epoch,
172        total_epochs_for_this_fork
173    );
174    Ok(1)
175}
176
177fn clear_previous_lines(stdout: &mut std::io::Stdout, lines: usize) -> anyhow::Result<()> {
178    if lines > 0 {
179        // Move cursor up `lines` times, return to start (\r), clear below
180        write!(
181            stdout,
182            "\r{}{}",
183            anes::MoveCursorUp(lines as u16),
184            anes::ClearBuffer::Below,
185        )?;
186    }
187    Ok(())
188}
189
190fn tipset_key_to_string(key: &TipsetKey) -> String {
191    let cids = key.to_cids();
192    match cids.len() {
193        0 => "[]".to_string(),
194        _ => format!("[{}, ...]", cids.first()),
195    }
196}
197
198/// Check if the snapshot download is in progress, if wait is true,
199/// wait till snapshot download is completed else return after checking once
200async fn check_snapshot_progress(
201    client: &rpc::Client,
202    wait: bool,
203) -> anyhow::Result<SnapshotProgressState> {
204    let mut interval = time::interval(Duration::from_secs(5));
205    let mut stdout = stdout();
206    loop {
207        interval.tick().await;
208
209        let progress_state = client.call(SyncSnapshotProgress::request(())?).await?;
210
211        write!(
212            stdout,
213            "\r{}{}Snapshot status: {}\n",
214            anes::MoveCursorUp(1),
215            anes::ClearLine::All,
216            progress_state
217        )?;
218        stdout.flush()?;
219
220        match progress_state {
221            SnapshotProgressState::Completed | SnapshotProgressState::NotRequired => {
222                println!();
223                return Ok(progress_state);
224            }
225            _ if !wait => {
226                return Ok(progress_state);
227            }
228            _ => {} // continue
229        }
230    }
231}
232
233/// Waits for node initialization to complete (start `Syncing`).
234async fn wait_for_node_to_start_syncing(client: &rpc::Client) -> anyhow::Result<()> {
235    let mut is_msg_printed = false;
236    let mut stdout = stdout();
237    const POLLING_INTERVAL: Duration = Duration::from_secs(1);
238
239    loop {
240        let report = SyncStatus::call(client, ())
241            .await
242            .context("Failed to get sync status while waiting for initialization to complete")?;
243
244        if report.status == NodeSyncStatus::Initializing {
245            write!(stdout, "\ršŸ”„ Node syncing is initializing, please wait...")?;
246            stdout.flush()?;
247            is_msg_printed = true;
248
249            sleep(POLLING_INTERVAL).await;
250        } else {
251            if is_msg_printed {
252                clear_previous_lines(&mut stdout, 1)
253                    .context("Failed to clear initializing message")?;
254            }
255
256            break;
257        }
258    }
259
260    Ok(())
261}
262
263/// Checks if a snapshot download is required or in progress when the node is initializing.
264/// If a snapshot download is in progress, it waits for completion before starting the sync monitor.
265async fn handle_initial_snapshot_check(client: &rpc::Client) -> anyhow::Result<()> {
266    let initial_report = SyncStatus::call(client, ())
267        .await
268        .context("Failed to get sync status")?;
269    if initial_report.status == NodeSyncStatus::Initializing {
270        // if the snapshot download is not required, then return,
271        // else wait till the snapshot download is completed.
272        if !check_snapshot_progress(client, false)
273            .await?
274            .is_not_required()
275        {
276            check_snapshot_progress(client, true).await?;
277        }
278    }
279
280    Ok(())
281}