1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
use crate::cli::command::sync::IfExistsOutputBehaviour;
use crate::cli::command::sync::drive_sync_info::DriveSyncInfo;
use crate::cli::command::sync::drive_sync_info::resolve_drive_infos;
use crate::cli::command::sync::index::SyncIndexArgs;
use crate::cli::command::sync::mft::SyncMftArgs;
use arbitrary::Arbitrary;
use facet::Facet;
use figue::{self as args};
use futures::TryStreamExt;
use std::collections::BTreeSet;
use std::sync::Arc;
use teamy_windows::storage::DriveLetterPattern;
use tokio_stream::StreamExt;
use tracing::info_span;
#[derive(Facet, PartialEq, Debug, Arbitrary, Default)]
pub struct SyncArgs {
/// Drive letter pattern to match drives to sync (e.g., "*", "C", "CD", "C,D")
#[facet(args::named, default)]
pub drive_letter_pattern: DriveLetterPattern,
/// How to handle existing output files
#[facet(args::named, default)]
pub if_exists: IfExistsOutputBehaviour,
/// Sync stage to run
#[facet(args::subcommand)]
pub command: Option<SyncCommand>,
}
impl SyncArgs {
/// Sync MFT data from drives.
///
/// # Errors
///
/// Returns an error if the sync directory cannot be retrieved, elevation fails,
/// or if reading/writing MFT data fails.
pub fn invoke(self) -> eyre::Result<()> {
let drive_infos = resolve_drive_infos(&self.drive_letter_pattern)?;
let if_exists = &self.if_exists;
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
runtime.block_on(async move {
self.command
.unwrap_or_default()
.invoke(drive_infos, if_exists)
.await
})
}
}
#[derive(Facet, Arbitrary, PartialEq, Debug, Clone, Default)]
#[repr(u8)]
#[facet(rename_all = "kebab-case")]
pub enum SyncCommand {
/// Sync raw .mft snapshots
Mft(SyncMftArgs),
/// Build `.mft_search_index` files from snapshots
Index(SyncIndexArgs),
/// Sync both stages sequentially, with preflight checks and error handling for both stages
#[default]
Both,
}
impl SyncCommand {
/// # Errors
///
/// Returns an error if the sync fails, likely caused by IO problems.
#[expect(
clippy::too_many_lines,
reason = "This command coordinates multiple sync paths and error-handling branches."
)]
pub async fn invoke(
&self,
drive_infos: Vec<DriveSyncInfo>,
if_exists: &IfExistsOutputBehaviour,
) -> eyre::Result<()> {
match self {
Self::Mft(SyncMftArgs) => {
let drive_infos = SyncMftArgs::invoke_preflight(drive_infos, if_exists)?;
let mft_data = {
let _guard = info_span!("dispatch mft sync work").entered();
SyncMftArgs::invoke(drive_infos)?
};
tokio::pin!(mft_data);
let _guard = info_span!("collect mft sync results").entered();
while let Some(result) = mft_data.next().await {
let (drive_info, physical_mft) = result?;
{
let _guard = info_span!(
"drop_physical_mft_read_result",
drive = %drive_info.drive_letter,
physical_segments = physical_mft.physical_read_results.entries.len(),
logical_segments = physical_mft.logical_read_plan.segments.len(),
)
.entered();
drop(physical_mft);
}
}
Ok(())
}
Self::Index(SyncIndexArgs) => {
let drive_infos = SyncIndexArgs::invoke_preflight(drive_infos, if_exists)?;
SyncIndexArgs.invoke(drive_infos)
}
Self::Both => {
// The two stages have different skip/overwrite/abort filtering rules, so
// they must each run their own preflight over the same initial drive set.
let mft_drive_infos =
SyncMftArgs::invoke_preflight(drive_infos.clone(), if_exists)?;
let index_drive_infos = SyncIndexArgs::invoke_preflight(drive_infos, if_exists)?;
// Drives present in both sets can build the index directly from the fresh
// in-memory `PhysicalMftReadResult` produced by the MFT stage, avoiding a
// write-then-read roundtrip through the cached `.mft` file.
let mft_drive_letters = mft_drive_infos
.iter()
.map(|info| info.drive_letter)
.collect::<BTreeSet<_>>();
let in_memory_index_drive_letters = Arc::new(
index_drive_infos
.iter()
.map(|info| info.drive_letter)
.filter(|drive_letter| mft_drive_letters.contains(drive_letter))
.collect::<BTreeSet<_>>(),
);
// Any drive that still needs indexing but is not part of the current MFT sync
// cannot use the in-memory fast path. This happens, for example, when the MFT
// file already exists and MFT sync is skipped, but the search index still needs
// to be built from the cached `.mft` on disk.
let fallback_index_drive_infos = index_drive_infos
.into_iter()
.filter(|info| !mft_drive_letters.contains(&info.drive_letter))
.collect::<Vec<_>>();
let mft_data = {
let _guard = info_span!("dispatch mft sync work").entered();
SyncMftArgs::invoke(mft_drive_infos)?
};
let in_memory_index_drive_letters_for_stream =
Arc::clone(&in_memory_index_drive_letters);
let in_memory_indexing = async move {
// Consume completed MFT reads as they arrive and fan index construction out
// concurrently so slow drives do not block faster ones.
let _guard = info_span!(
"collect mft sync and in_memory index results",
drive_count = in_memory_index_drive_letters_for_stream.len(),
)
.entered();
mft_data
.try_for_each_concurrent(None, move |(drive_info, physical_mft)| {
let in_memory_index_drive_letters =
Arc::clone(&in_memory_index_drive_letters_for_stream);
async move {
if !in_memory_index_drive_letters.contains(&drive_info.drive_letter)
{
return Ok(());
}
tokio::task::spawn_blocking(move || -> eyre::Result<()> {
let _guard = info_span!(
"build_in_memory_search_index_for_drive",
drive = %drive_info.drive_letter,
index_path = %drive_info.index_output_path.display(),
)
.entered();
let mft_file = physical_mft.to_mft_file()?;
SyncIndexArgs.invoke_for_mft_file(&drive_info, &mft_file)?;
{
let _guard = info_span!(
"drop_in_memory_index_inputs",
drive = %drive_info.drive_letter,
physical_segments = physical_mft.physical_read_results.entries.len(),
logical_segments = physical_mft.logical_read_plan.segments.len(),
mft_entries = mft_file.record_count(),
)
.entered();
drop(mft_file);
drop(physical_mft);
};
Ok(())
})
.await
.map_err(|error| {
eyre::eyre!("Failed joining in-memory index task: {error}")
})??;
Ok(())
}
})
.await
};
let disk_indexing = async move {
// Run the disk-backed index path in parallel with the in-memory path so
// drives skipped by the MFT stage do not have to wait for fresh MFT reads.
if fallback_index_drive_infos.is_empty() {
return Ok(());
}
let _guard = info_span!(
"build_disk_backed_search_indexes",
drive_count = fallback_index_drive_infos.len(),
)
.entered();
SyncIndexArgs.invoke(fallback_index_drive_infos)
};
tokio::try_join!(in_memory_indexing, disk_indexing)?;
Ok(())
}
}
}
}