Skip to main content

om_snapper/
snapshot.rs

1use anyhow::Error;
2use bytesize::ByteSize;
3use indicatif::MultiProgress;
4use indicatif::ProgressBar;
5use indicatif::ProgressStyle;
6use memmapix::MmapMut;
7use std::collections::VecDeque;
8use std::path::PathBuf;
9
10use std::fs::OpenOptions;
11
12use std::fs::File;
13use std::io::Seek;
14use std::io::SeekFrom;
15use std::io::Write;
16use std::path::Path;
17
18#[derive(Debug, Default)]
19enum ChunkState {
20    #[default]
21    Todo,
22    InProgress,
23    Failed,
24    Invalid,
25    Done,
26}
27
28impl From<u8> for ChunkState {
29    fn from(u: u8) -> Self {
30        match u {
31            0b0000_0000 => ChunkState::Todo,
32            0b0100_0000 => ChunkState::InProgress,
33            0b1000_0000 => ChunkState::Failed,
34            0b1111_1111 => ChunkState::Done,
35            //            0b100 => ChunkState::Done,     // :HACK:
36            0b100 => ChunkState::Todo, // :HACK:
37            //            0b1 => ChunkState::InProgress, // :HACK:
38            0b1010_1010 => ChunkState::Invalid,
39            _ => ChunkState::Invalid,
40        }
41    }
42}
43
44impl From<ChunkState> for u8 {
45    fn from(cs: ChunkState) -> Self {
46        match cs {
47            ChunkState::Todo => 0b0000_0000,
48            ChunkState::InProgress => 0b0100_0000,
49            ChunkState::Failed => 0b1000_0000,
50            ChunkState::Invalid => 0b1010_1010, // Note: Invalid should never be written
51            ChunkState::Done => 0b1111_1111,
52        }
53    }
54}
55
56#[derive(Debug)]
57struct ChunkMap {
58    number_of_chunks: usize,
59    mmap: MmapMut,
60}
61
62impl ChunkMap {
63    pub fn create(name: &Path, number_of_chunks: usize) -> anyhow::Result<Self> {
64        let file = OpenOptions::new()
65            .read(true)
66            .write(true)
67            .create(true)
68            .open(&name)?;
69
70        file.set_len(number_of_chunks as u64)?;
71
72        let mmap = unsafe { MmapMut::map_mut(&file)? };
73        let s = Self {
74            number_of_chunks,
75            mmap,
76        };
77
78        Ok(s)
79    }
80
81    pub fn open(name: &Path) -> anyhow::Result<Self> {
82        let file = OpenOptions::new().read(true).write(true).open(&name)?;
83
84        let mmap = unsafe { MmapMut::map_mut(&file)? };
85        let number_of_chunks = mmap.len();
86        let s = Self {
87            number_of_chunks,
88            mmap,
89        };
90
91        Ok(s)
92    }
93
94    pub fn for_each_todo<F>(&self, mut f: F) -> anyhow::Result<()>
95    where
96        F: FnMut(usize, ChunkState) -> anyhow::Result<()>,
97    {
98        for (i, c) in self.mmap.iter().enumerate() {
99            let cs = ChunkState::from(*c);
100            match cs {
101                ChunkState::Todo => f(i, cs)?,
102                _ => {}
103            }
104        }
105        Ok(())
106    }
107
108    pub fn for_each_inprogress<F>(&self, mut f: F) -> anyhow::Result<()>
109    where
110        F: FnMut(usize, ChunkState) -> anyhow::Result<()>,
111    {
112        for (i, c) in self.mmap.iter().enumerate() {
113            let cs = ChunkState::from(*c);
114            // eprintln!("{} -> {:?} ({:#b})", i, cs, c);
115            match cs {
116                ChunkState::InProgress => f(i, cs)?,
117                _ => {}
118            }
119        }
120        Ok(())
121    }
122
123    pub fn set_chunk_state(&mut self, i: usize, s: ChunkState) -> anyhow::Result<()> {
124        if i >= self.number_of_chunks {
125            anyhow::bail!("Out of bounds")
126        }
127
128        self.mmap[i] = u8::from(s); // as u8;
129        Ok(())
130    }
131
132    pub fn chunk_count(&self) -> usize {
133        self.mmap.len()
134    }
135
136    pub fn get_chunk_state(&self, i: usize) -> Option<ChunkState> {
137        if i >= self.mmap.len() {
138            return None;
139        }
140
141        let c = self.mmap[i];
142        let cs = ChunkState::from(c);
143        Some(cs)
144    }
145}
146
147#[derive(Debug)]
148pub struct Snapshot {
149    id: String,
150    progress: Option<MultiProgress>,
151    r#continue: bool,
152
153    image_file: PathBuf,
154    map_file: PathBuf,
155}
156
157const BLOCKS_PER_CHUNK: usize = 100; // >=100 as per AWS API
158const BLOCK_SIZE: usize = 524288; // 512KiB
159const CHUNK_SIZE: usize = BLOCK_SIZE * BLOCKS_PER_CHUNK;
160
161impl Snapshot {
162    pub fn new(id: &str) -> Self {
163        Self {
164            id: id.to_string(),
165            progress: None,
166            r#continue: false,
167
168            image_file: Path::new(&format!("./{}.img", &id)).to_path_buf(),
169            map_file: Path::new(&format!("./{}.omsmap", &id)).to_path_buf(),
170            //                    let filename = format!("./{}.img", &self.id);
171        }
172    }
173
174    pub fn enable_continue(&mut self) {
175        self.r#continue = true;
176    }
177
178    pub fn use_progress(&mut self, m: MultiProgress) {
179        self.progress = Some(m);
180    }
181
182    async fn ec2_client(&self) -> anyhow::Result<aws_sdk_ec2::Client> {
183        let config = aws_config::load_from_env().await;
184        let ec2_client = aws_sdk_ec2::Client::new(&config);
185
186        Ok::<aws_sdk_ec2::Client, Error>(ec2_client)
187    }
188
189    async fn ebs_client(&self) -> anyhow::Result<aws_sdk_ebs::Client> {
190        let config = aws_config::load_from_env().await;
191        let ebs_client = aws_sdk_ebs::Client::new(&config);
192
193        Ok(ebs_client)
194    }
195
196    pub fn image_file(&self) -> &Path {
197        &self.image_file
198    }
199
200    pub fn map_file(&self) -> &Path {
201        &self.map_file
202    }
203
204    pub async fn status(&mut self) -> anyhow::Result<bool> {
205        let mut all_good = true;
206
207        let image_file = self.image_file();
208        println!("Image file {image_file:?}");
209
210        let mut file_size = 0;
211        match image_file.try_exists() {
212            Ok(true) => {
213                println!("\t ... exists.");
214                let attr = std::fs::metadata(image_file)?;
215
216                if !attr.is_file() {
217                    println!("\t ... is NOT a plain file.");
218                    all_good = false;
219                } else {
220                    let l = attr.len();
221                    println!("\t ... contains {l} bytes.");
222                    println!("\t ... contains {}.", ByteSize::b(l));
223
224                    file_size = l;
225                }
226            }
227            Ok(false) => {
228                println!("\t ... NOT exists.");
229                all_good = false;
230            }
231            Err(o) => {
232                anyhow::bail!("Failed checking image file {image_file:?}  -> {o}")
233            }
234        };
235
236        let map_file = self.map_file();
237        println!("Map file {map_file:?}");
238        match map_file.try_exists() {
239            Ok(true) => {
240                println!("\t ... exists.");
241                let attr = std::fs::metadata(map_file)?;
242
243                if !attr.is_file() {
244                    println!("\t ... is NOT a plain file.");
245                    all_good = false;
246                } else {
247                    let l = attr.len();
248                    println!("\t ... contains {l} chunks.");
249                    let min_size = l * CHUNK_SIZE as u64;
250                    let max_size = min_size + CHUNK_SIZE as u64;
251                    println!("\t ... expected file size {} - {}.", min_size, max_size);
252                    println!("\t ... expected file size ~{}.", ByteSize::b(min_size));
253
254                    if file_size > max_size {
255                        println!(
256                            "\t ... Image file size is too big {} > {}",
257                            file_size, max_size
258                        );
259                        all_good = false;
260                    } else if file_size < min_size {
261                        println!(
262                            "\t ... Image file size is too small {} < {}",
263                            file_size, min_size
264                        );
265                        all_good = false;
266                    } else {
267                        println!(
268                            "\t ... Image file size matches: {} < {} < {}",
269                            min_size, file_size, max_size
270                        );
271                    }
272                }
273            }
274            Ok(false) => {
275                println!("\t ... NOT exists.");
276                all_good = false;
277            }
278            Err(o) => {
279                anyhow::bail!("Failed checking map file {map_file:?}  -> {o}")
280            }
281        };
282
283        // :TODO: make configurable
284        const CODE_TODO: &str = "⬛️";
285        const CODE_INPROGRESS: &str = "🚚";
286        const CODE_FAILED: &str = "🔺";
287        const CODE_INVALID: &str = "❗️";
288        const CODE_DONE: &str = "🟩";
289
290        if all_good {
291            let map = ChunkMap::open(map_file)?;
292            // dbg!(&map);
293
294            for i in 0..map.chunk_count() {
295                if let Some(cs) = map.get_chunk_state(i) {
296                    /*
297                    let code = match cs {
298                        ChunkState::Todo => 't',
299                        ChunkState::InProgress => 'p',
300                        ChunkState::Failed => 'F',
301                        ChunkState::Invalid => 'I',
302                        ChunkState::Done => 'D',
303                    };
304                    */
305                    let code = match cs {
306                        ChunkState::Todo => CODE_TODO,
307                        ChunkState::InProgress => CODE_INPROGRESS,
308                        ChunkState::Failed => CODE_FAILED,
309                        ChunkState::Invalid => CODE_INVALID,
310                        ChunkState::Done => CODE_DONE,
311                    };
312                    print!("{code}");
313                } else {
314                    print!("!");
315                }
316
317                if i % 20 == 19 {
318                    println!("");
319                }
320            }
321        }
322        println!("");
323        println!("{CODE_DONE} = Done");
324        println!("{CODE_INPROGRESS} = InProgress");
325        println!("{CODE_FAILED} = Failed");
326        println!("{CODE_TODO} = todo");
327        println!("{CODE_INVALID} = Invalid");
328        Ok(all_good)
329    }
330
331    pub async fn verify(&mut self) -> anyhow::Result<()> {
332        Ok(())
333    }
334
335    pub async fn download(&mut self) -> anyhow::Result<()> {
336        let ec2_client = self.ec2_client().await?;
337
338        let snapshots = ec2_client.describe_snapshots().snapshot_ids(&self.id);
339
340        let snapshots = snapshots.send().await?;
341
342        let size_in_bytes; // = 0;
343        if let Some(snapshots) = snapshots.snapshots {
344            if let Some((_description, _state, size)) = snapshots.iter().find_map(|s| {
345                // this is a bit silly since we should expect exactly one result
346                if s.snapshot_id != Some(self.id.clone()) {
347                    None
348                } else {
349                    //dbg!(s);
350                    Some((s.description.clone(), s.state.clone(), s.volume_size))
351                }
352            }) {
353                size.expect("Volume size is needed");
354
355                let size = size.unwrap() as usize;
356                size_in_bytes = size * 1_073_741_824;
357
358                tracing::info!("Downloading {}GiB", size)
359            } else {
360                anyhow::bail!("Snapshot {} not found", &self.id);
361            }
362        } else {
363            anyhow::bail!("Snapshot {} not found", &self.id);
364        }
365
366        let filename = format!("./{}.img", &self.id);
367        let path = Path::new(&filename);
368        let mut f = match path.try_exists() {
369            Ok(true) => {
370                // check continue
371                if !self.r#continue {
372                    anyhow::bail!("{filename} exists, but 'continue' was not requested");
373                }
374                OpenOptions::new().write(true).open(&path)?
375            }
376            Ok(false) => {
377                // create
378                File::create(&path)?
379            }
380            Err(o) => {
381                tracing::error!("Failed verifying if {filename} exists -> {o}");
382                anyhow::bail!("Failed verifying if {filename} exists -> {o}")
383            }
384        };
385
386        // preallocate the file on disk
387        f.set_len(size_in_bytes as u64)?;
388
389        let chunks = size_in_bytes / CHUNK_SIZE;
390
391        let map_name = self.map_file(); //format!("./{}.omsmap", &self.id);
392
393        let mut chunk_map = ChunkMap::create(map_name, chunks)?;
394
395        tracing::info!("Queing {} chunks", chunks);
396
397        let mut chunk_queue = VecDeque::new();
398
399        chunk_map.for_each_inprogress(|i, _s| {
400            dbg!(i);
401            chunk_queue.push_back(i);
402            Ok(())
403        })?;
404
405        chunk_map.for_each_todo(|i, _s| {
406            chunk_queue.push_back(i);
407            Ok(())
408        })?;
409
410        let chunk_progress = if let Some(mp) = &self.progress {
411            let cl = chunk_queue.len();
412
413            let sty = ProgressStyle::with_template(
414                "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} [{eta_precise}] {msg}",
415            )
416            .unwrap()
417            .progress_chars("##-");
418
419            let progress = mp.add(ProgressBar::new(cl as u64));
420            progress.set_style(sty.clone());
421
422            Some(progress)
423        } else {
424            None
425        };
426
427        while let Some(c) = chunk_queue.pop_front() {
428            //tracing::info!("Downloading chunk {} / {}", c, chunks);
429            if let Some(pb) = &chunk_progress {
430                pb.set_message(format!("Downloading chunk {} / {}", c, chunks));
431            }
432
433            {
434                // :TODO: extract
435                chunk_map.set_chunk_state(c, ChunkState::InProgress)?;
436
437                let block_progress = if let Some(mp) = &self.progress {
438                    let sty = ProgressStyle::with_template(
439                        "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} [{eta_precise}] {msg}",
440                    )
441                    .unwrap()
442                    .progress_chars("##-");
443
444                    let progress = mp.add(ProgressBar::new(BLOCKS_PER_CHUNK as u64));
445                    progress.set_style(sty.clone());
446
447                    Some(progress)
448                } else {
449                    None
450                };
451
452                let client = self.ebs_client().await?; //aws_sdk_ebs::Client::new(&config);
453
454                let first_block_in_chunk = (c * BLOCKS_PER_CHUNK) as i32;
455                let last_block_in_chunk =
456                    (first_block_in_chunk + BLOCKS_PER_CHUNK as i32 - 1) as i32;
457
458                let list = client
459                    .list_snapshot_blocks()
460                    .snapshot_id(&self.id)
461                    .starting_block_index(first_block_in_chunk)
462                    .max_results(BLOCKS_PER_CHUNK as i32);
463                let list = list.send().await?;
464
465                // :TODO: verify block size
466                for block in &list.blocks.unwrap() {
467                    match (block.block_index, &block.block_token) {
468                        (Some(i), Some(t)) => {
469                            // Note: snapshots are sparse, so empty blocks will be skipped
470                            // resulting in bleeding into the next chunk here
471                            // Plan: A different approach on slicing/chunking this might be better
472
473                            if i >= first_block_in_chunk && i <= last_block_in_chunk {
474                                // tracing::info!("Downloading block {}", i);
475                                if let Some(pb) = &block_progress {
476                                    pb.set_message(format!(
477                                        "Downloading block {} [{}-{}]",
478                                        i, first_block_in_chunk, last_block_in_chunk
479                                    ));
480                                }
481
482                                let block = client
483                                    .get_snapshot_block()
484                                    .snapshot_id(&self.id)
485                                    .block_index(i)
486                                    .block_token(t);
487
488                                let block = block.send().await?;
489
490                                //dbg!(block);
491                                let p = i as u64 * BLOCK_SIZE as u64;
492
493                                f.seek(SeekFrom::Start(p as u64))?;
494                                //        let r = u8::read_from(block.block_data)?;
495                                let data = block.block_data.collect().await?;
496                                //io::copy(&mut data, &mut f)?;
497                                f.write(&data.into_bytes())?;
498
499                                if let Some(block_progress) = &block_progress {
500                                    block_progress.inc(1);
501                                }
502                            }
503                        }
504                        _ => {
505                            // :TODO:
506                        }
507                    }
508                }
509                chunk_map.set_chunk_state(c, ChunkState::Done)?;
510                if let Some(chunk_progress) = &chunk_progress {
511                    chunk_progress.inc(1);
512                }
513            }
514        }
515
516        Ok(())
517    }
518}