1use anyhow::Error;
2use bytesize::ByteSize;
3use indicatif::MultiProgress;
4use indicatif::ProgressBar;
5use indicatif::ProgressStyle;
6use memmapix::MmapMut;
7use std::collections::VecDeque;
8use std::path::PathBuf;
9
10use std::fs::OpenOptions;
11
12use std::fs::File;
13use std::io::Seek;
14use std::io::SeekFrom;
15use std::io::Write;
16use std::path::Path;
17
18#[derive(Debug, Default)]
19enum ChunkState {
20 #[default]
21 Todo,
22 InProgress,
23 Failed,
24 Invalid,
25 Done,
26}
27
28impl From<u8> for ChunkState {
29 fn from(u: u8) -> Self {
30 match u {
31 0b0000_0000 => ChunkState::Todo,
32 0b0100_0000 => ChunkState::InProgress,
33 0b1000_0000 => ChunkState::Failed,
34 0b1111_1111 => ChunkState::Done,
35 0b100 => ChunkState::Todo, 0b1010_1010 => ChunkState::Invalid,
39 _ => ChunkState::Invalid,
40 }
41 }
42}
43
44impl From<ChunkState> for u8 {
45 fn from(cs: ChunkState) -> Self {
46 match cs {
47 ChunkState::Todo => 0b0000_0000,
48 ChunkState::InProgress => 0b0100_0000,
49 ChunkState::Failed => 0b1000_0000,
50 ChunkState::Invalid => 0b1010_1010, ChunkState::Done => 0b1111_1111,
52 }
53 }
54}
55
56#[derive(Debug)]
57struct ChunkMap {
58 number_of_chunks: usize,
59 mmap: MmapMut,
60}
61
62impl ChunkMap {
63 pub fn create(name: &Path, number_of_chunks: usize) -> anyhow::Result<Self> {
64 let file = OpenOptions::new()
65 .read(true)
66 .write(true)
67 .create(true)
68 .open(&name)?;
69
70 file.set_len(number_of_chunks as u64)?;
71
72 let mmap = unsafe { MmapMut::map_mut(&file)? };
73 let s = Self {
74 number_of_chunks,
75 mmap,
76 };
77
78 Ok(s)
79 }
80
81 pub fn open(name: &Path) -> anyhow::Result<Self> {
82 let file = OpenOptions::new().read(true).write(true).open(&name)?;
83
84 let mmap = unsafe { MmapMut::map_mut(&file)? };
85 let number_of_chunks = mmap.len();
86 let s = Self {
87 number_of_chunks,
88 mmap,
89 };
90
91 Ok(s)
92 }
93
94 pub fn for_each_todo<F>(&self, mut f: F) -> anyhow::Result<()>
95 where
96 F: FnMut(usize, ChunkState) -> anyhow::Result<()>,
97 {
98 for (i, c) in self.mmap.iter().enumerate() {
99 let cs = ChunkState::from(*c);
100 match cs {
101 ChunkState::Todo => f(i, cs)?,
102 _ => {}
103 }
104 }
105 Ok(())
106 }
107
108 pub fn for_each_inprogress<F>(&self, mut f: F) -> anyhow::Result<()>
109 where
110 F: FnMut(usize, ChunkState) -> anyhow::Result<()>,
111 {
112 for (i, c) in self.mmap.iter().enumerate() {
113 let cs = ChunkState::from(*c);
114 match cs {
116 ChunkState::InProgress => f(i, cs)?,
117 _ => {}
118 }
119 }
120 Ok(())
121 }
122
123 pub fn set_chunk_state(&mut self, i: usize, s: ChunkState) -> anyhow::Result<()> {
124 if i >= self.number_of_chunks {
125 anyhow::bail!("Out of bounds")
126 }
127
128 self.mmap[i] = u8::from(s); Ok(())
130 }
131
132 pub fn chunk_count(&self) -> usize {
133 self.mmap.len()
134 }
135
136 pub fn get_chunk_state(&self, i: usize) -> Option<ChunkState> {
137 if i >= self.mmap.len() {
138 return None;
139 }
140
141 let c = self.mmap[i];
142 let cs = ChunkState::from(c);
143 Some(cs)
144 }
145}
146
147#[derive(Debug)]
148pub struct Snapshot {
149 id: String,
150 progress: Option<MultiProgress>,
151 r#continue: bool,
152
153 image_file: PathBuf,
154 map_file: PathBuf,
155}
156
157const BLOCKS_PER_CHUNK: usize = 100; const BLOCK_SIZE: usize = 524288; const CHUNK_SIZE: usize = BLOCK_SIZE * BLOCKS_PER_CHUNK;
160
161impl Snapshot {
162 pub fn new(id: &str) -> Self {
163 Self {
164 id: id.to_string(),
165 progress: None,
166 r#continue: false,
167
168 image_file: Path::new(&format!("./{}.img", &id)).to_path_buf(),
169 map_file: Path::new(&format!("./{}.omsmap", &id)).to_path_buf(),
170 }
172 }
173
174 pub fn enable_continue(&mut self) {
175 self.r#continue = true;
176 }
177
178 pub fn use_progress(&mut self, m: MultiProgress) {
179 self.progress = Some(m);
180 }
181
182 async fn ec2_client(&self) -> anyhow::Result<aws_sdk_ec2::Client> {
183 let config = aws_config::load_from_env().await;
184 let ec2_client = aws_sdk_ec2::Client::new(&config);
185
186 Ok::<aws_sdk_ec2::Client, Error>(ec2_client)
187 }
188
189 async fn ebs_client(&self) -> anyhow::Result<aws_sdk_ebs::Client> {
190 let config = aws_config::load_from_env().await;
191 let ebs_client = aws_sdk_ebs::Client::new(&config);
192
193 Ok(ebs_client)
194 }
195
196 pub fn image_file(&self) -> &Path {
197 &self.image_file
198 }
199
200 pub fn map_file(&self) -> &Path {
201 &self.map_file
202 }
203
204 pub async fn status(&mut self) -> anyhow::Result<bool> {
205 let mut all_good = true;
206
207 let image_file = self.image_file();
208 println!("Image file {image_file:?}");
209
210 let mut file_size = 0;
211 match image_file.try_exists() {
212 Ok(true) => {
213 println!("\t ... exists.");
214 let attr = std::fs::metadata(image_file)?;
215
216 if !attr.is_file() {
217 println!("\t ... is NOT a plain file.");
218 all_good = false;
219 } else {
220 let l = attr.len();
221 println!("\t ... contains {l} bytes.");
222 println!("\t ... contains {}.", ByteSize::b(l));
223
224 file_size = l;
225 }
226 }
227 Ok(false) => {
228 println!("\t ... NOT exists.");
229 all_good = false;
230 }
231 Err(o) => {
232 anyhow::bail!("Failed checking image file {image_file:?} -> {o}")
233 }
234 };
235
236 let map_file = self.map_file();
237 println!("Map file {map_file:?}");
238 match map_file.try_exists() {
239 Ok(true) => {
240 println!("\t ... exists.");
241 let attr = std::fs::metadata(map_file)?;
242
243 if !attr.is_file() {
244 println!("\t ... is NOT a plain file.");
245 all_good = false;
246 } else {
247 let l = attr.len();
248 println!("\t ... contains {l} chunks.");
249 let min_size = l * CHUNK_SIZE as u64;
250 let max_size = min_size + CHUNK_SIZE as u64;
251 println!("\t ... expected file size {} - {}.", min_size, max_size);
252 println!("\t ... expected file size ~{}.", ByteSize::b(min_size));
253
254 if file_size > max_size {
255 println!(
256 "\t ... Image file size is too big {} > {}",
257 file_size, max_size
258 );
259 all_good = false;
260 } else if file_size < min_size {
261 println!(
262 "\t ... Image file size is too small {} < {}",
263 file_size, min_size
264 );
265 all_good = false;
266 } else {
267 println!(
268 "\t ... Image file size matches: {} < {} < {}",
269 min_size, file_size, max_size
270 );
271 }
272 }
273 }
274 Ok(false) => {
275 println!("\t ... NOT exists.");
276 all_good = false;
277 }
278 Err(o) => {
279 anyhow::bail!("Failed checking map file {map_file:?} -> {o}")
280 }
281 };
282
283 const CODE_TODO: &str = "⬛️";
285 const CODE_INPROGRESS: &str = "🚚";
286 const CODE_FAILED: &str = "🔺";
287 const CODE_INVALID: &str = "❗️";
288 const CODE_DONE: &str = "🟩";
289
290 if all_good {
291 let map = ChunkMap::open(map_file)?;
292 for i in 0..map.chunk_count() {
295 if let Some(cs) = map.get_chunk_state(i) {
296 let code = match cs {
306 ChunkState::Todo => CODE_TODO,
307 ChunkState::InProgress => CODE_INPROGRESS,
308 ChunkState::Failed => CODE_FAILED,
309 ChunkState::Invalid => CODE_INVALID,
310 ChunkState::Done => CODE_DONE,
311 };
312 print!("{code}");
313 } else {
314 print!("!");
315 }
316
317 if i % 20 == 19 {
318 println!("");
319 }
320 }
321 }
322 println!("");
323 println!("{CODE_DONE} = Done");
324 println!("{CODE_INPROGRESS} = InProgress");
325 println!("{CODE_FAILED} = Failed");
326 println!("{CODE_TODO} = todo");
327 println!("{CODE_INVALID} = Invalid");
328 Ok(all_good)
329 }
330
331 pub async fn verify(&mut self) -> anyhow::Result<()> {
332 Ok(())
333 }
334
335 pub async fn download(&mut self) -> anyhow::Result<()> {
336 let ec2_client = self.ec2_client().await?;
337
338 let snapshots = ec2_client.describe_snapshots().snapshot_ids(&self.id);
339
340 let snapshots = snapshots.send().await?;
341
342 let size_in_bytes; if let Some(snapshots) = snapshots.snapshots {
344 if let Some((_description, _state, size)) = snapshots.iter().find_map(|s| {
345 if s.snapshot_id != Some(self.id.clone()) {
347 None
348 } else {
349 Some((s.description.clone(), s.state.clone(), s.volume_size))
351 }
352 }) {
353 size.expect("Volume size is needed");
354
355 let size = size.unwrap() as usize;
356 size_in_bytes = size * 1_073_741_824;
357
358 tracing::info!("Downloading {}GiB", size)
359 } else {
360 anyhow::bail!("Snapshot {} not found", &self.id);
361 }
362 } else {
363 anyhow::bail!("Snapshot {} not found", &self.id);
364 }
365
366 let filename = format!("./{}.img", &self.id);
367 let path = Path::new(&filename);
368 let mut f = match path.try_exists() {
369 Ok(true) => {
370 if !self.r#continue {
372 anyhow::bail!("{filename} exists, but 'continue' was not requested");
373 }
374 OpenOptions::new().write(true).open(&path)?
375 }
376 Ok(false) => {
377 File::create(&path)?
379 }
380 Err(o) => {
381 tracing::error!("Failed verifying if {filename} exists -> {o}");
382 anyhow::bail!("Failed verifying if {filename} exists -> {o}")
383 }
384 };
385
386 f.set_len(size_in_bytes as u64)?;
388
389 let chunks = size_in_bytes / CHUNK_SIZE;
390
391 let map_name = self.map_file(); let mut chunk_map = ChunkMap::create(map_name, chunks)?;
394
395 tracing::info!("Queing {} chunks", chunks);
396
397 let mut chunk_queue = VecDeque::new();
398
399 chunk_map.for_each_inprogress(|i, _s| {
400 dbg!(i);
401 chunk_queue.push_back(i);
402 Ok(())
403 })?;
404
405 chunk_map.for_each_todo(|i, _s| {
406 chunk_queue.push_back(i);
407 Ok(())
408 })?;
409
410 let chunk_progress = if let Some(mp) = &self.progress {
411 let cl = chunk_queue.len();
412
413 let sty = ProgressStyle::with_template(
414 "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} [{eta_precise}] {msg}",
415 )
416 .unwrap()
417 .progress_chars("##-");
418
419 let progress = mp.add(ProgressBar::new(cl as u64));
420 progress.set_style(sty.clone());
421
422 Some(progress)
423 } else {
424 None
425 };
426
427 while let Some(c) = chunk_queue.pop_front() {
428 if let Some(pb) = &chunk_progress {
430 pb.set_message(format!("Downloading chunk {} / {}", c, chunks));
431 }
432
433 {
434 chunk_map.set_chunk_state(c, ChunkState::InProgress)?;
436
437 let block_progress = if let Some(mp) = &self.progress {
438 let sty = ProgressStyle::with_template(
439 "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} [{eta_precise}] {msg}",
440 )
441 .unwrap()
442 .progress_chars("##-");
443
444 let progress = mp.add(ProgressBar::new(BLOCKS_PER_CHUNK as u64));
445 progress.set_style(sty.clone());
446
447 Some(progress)
448 } else {
449 None
450 };
451
452 let client = self.ebs_client().await?; let first_block_in_chunk = (c * BLOCKS_PER_CHUNK) as i32;
455 let last_block_in_chunk =
456 (first_block_in_chunk + BLOCKS_PER_CHUNK as i32 - 1) as i32;
457
458 let list = client
459 .list_snapshot_blocks()
460 .snapshot_id(&self.id)
461 .starting_block_index(first_block_in_chunk)
462 .max_results(BLOCKS_PER_CHUNK as i32);
463 let list = list.send().await?;
464
465 for block in &list.blocks.unwrap() {
467 match (block.block_index, &block.block_token) {
468 (Some(i), Some(t)) => {
469 if i >= first_block_in_chunk && i <= last_block_in_chunk {
474 if let Some(pb) = &block_progress {
476 pb.set_message(format!(
477 "Downloading block {} [{}-{}]",
478 i, first_block_in_chunk, last_block_in_chunk
479 ));
480 }
481
482 let block = client
483 .get_snapshot_block()
484 .snapshot_id(&self.id)
485 .block_index(i)
486 .block_token(t);
487
488 let block = block.send().await?;
489
490 let p = i as u64 * BLOCK_SIZE as u64;
492
493 f.seek(SeekFrom::Start(p as u64))?;
494 let data = block.block_data.collect().await?;
496 f.write(&data.into_bytes())?;
498
499 if let Some(block_progress) = &block_progress {
500 block_progress.inc(1);
501 }
502 }
503 }
504 _ => {
505 }
507 }
508 }
509 chunk_map.set_chunk_state(c, ChunkState::Done)?;
510 if let Some(chunk_progress) = &chunk_progress {
511 chunk_progress.inc(1);
512 }
513 }
514 }
515
516 Ok(())
517 }
518}