photon_indexer/snapshot/
mod.rs

1use std::{
2    env::temp_dir,
3    fs::{self, File, OpenOptions},
4    io::{BufReader, Error, ErrorKind, Read, Write},
5    path::PathBuf,
6    pin::Pin,
7    sync::Arc,
8    task::Poll,
9};
10
11pub use crate::common::{
12    fetch_block_parent_slot, get_network_start_slot, setup_logging, setup_metrics, LoggingFormat,
13};
14use crate::ingester::{
15    fetchers::BlockStreamConfig,
16    parser::ACCOUNT_COMPRESSION_PROGRAM_ID,
17    typedefs::block_info::{BlockInfo, Instruction, TransactionInfo},
18};
19use anyhow::{anyhow, Context as AnyhowContext, Result};
20use async_stream::stream;
21use bytes::{BufMut, Bytes};
22use futures::stream::StreamExt;
23use futures::{pin_mut, stream, Stream};
24use log::info;
25use s3::creds::Credentials;
26use s3::region::Region;
27use s3::{bucket::Bucket, BucketConfiguration};
28use s3_utils::multipart_upload::put_object_stream_custom;
29use tokio::io::{AsyncRead, ReadBuf};
30pub mod s3_utils;
31
32pub const MEGABYTE: usize = 1024 * 1024;
33pub const CHUNK_SIZE: usize = 100 * 1024 * 1024;
34// Up to 50 MB
35pub const TRANSACTIONS_TO_ACCUMULATE: usize = 5000;
36
37const SNAPSHOT_VERSION: u8 = 1;
38
39pub struct R2DirectoryAdapter {
40    pub r2_bucket: Bucket,
41    pub r2_prefix: String,
42}
43
44pub struct R2BucketArgs {
45    pub r2_credentials: Credentials,
46    pub r2_region: Region,
47    pub r2_bucket: String,
48    pub create_bucket: bool,
49}
50
51pub async fn get_r2_bucket(args: R2BucketArgs) -> Bucket {
52    let bucket = Bucket::new(
53        args.r2_bucket.as_str(),
54        args.r2_region.clone(),
55        args.r2_credentials.clone(),
56    )
57    .unwrap()
58    .with_path_style();
59    if args.create_bucket {
60        // Check if the bucket already exists
61        let bucket_exists = bucket.exists().await.unwrap();
62        if !bucket_exists {
63            Bucket::create_with_path_style(
64                args.r2_bucket.as_str(),
65                args.r2_region.clone(),
66                args.r2_credentials.clone(),
67                BucketConfiguration::default(),
68            )
69            .await
70            .unwrap();
71        }
72    }
73    bucket
74}
75
76struct StreamReader<S> {
77    stream: S,
78    byte_buffer: Vec<u8>,
79}
80
81impl<S> AsyncRead for StreamReader<S>
82where
83    S: stream::Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
84{
85    fn poll_read(
86        mut self: Pin<&mut Self>,
87        cx: &mut std::task::Context<'_>,
88        buf: &mut ReadBuf<'_>,
89    ) -> Poll<Result<(), std::io::Error>> {
90        if !self.byte_buffer.is_empty() {
91            let len = std::cmp::min(self.byte_buffer.len(), buf.remaining_mut());
92            buf.put_slice(&self.byte_buffer[..len]);
93            self.byte_buffer.drain(..len);
94            return Poll::Ready(Ok(()));
95        }
96        match futures::ready!(self.stream.poll_next_unpin(cx)) {
97            Some(Ok(chunk)) => {
98                self.byte_buffer.extend_from_slice(chunk.as_ref());
99                let len = std::cmp::min(self.byte_buffer.len(), buf.remaining_mut());
100                buf.put_slice(&self.byte_buffer[..len]);
101                self.byte_buffer.drain(..len);
102                Poll::Ready(Ok(()))
103            }
104            Some(Err(e)) => Poll::Ready(Err(e)),
105            None => Poll::Ready(Ok(())), // EOF
106        }
107    }
108}
109
110impl R2DirectoryAdapter {
111    async fn read_file(
112        arc_self: Arc<Self>,
113        path: String,
114    ) -> impl Stream<Item = Result<Bytes>> + std::marker::Send + 'static {
115        stream! {
116            let r2_directory_adapter = arc_self.clone();
117            let mut result = r2_directory_adapter.r2_bucket.get_object_stream(path.clone()).await.with_context(|| format!("Failed to read file: {:?}", path))?;
118            let stream = result.bytes();
119
120            while let Some(byte) = stream.next().await {
121                let byte = byte.with_context(|| "Failed to read byte from file").unwrap();
122                yield Ok(byte);
123            }
124        }
125    }
126
127    async fn list_files(&self) -> Result<Vec<String>> {
128        let results = self
129            .r2_bucket
130            .list(self.r2_prefix.clone(), None)
131            .await
132            .unwrap_or_default();
133
134        let mut files = Vec::new();
135        for result in results {
136            for object in result.contents {
137                files.push(object.key);
138            }
139        }
140        Ok(files)
141    }
142
143    async fn delete_file(&self, path: String) -> Result<()> {
144        self.r2_bucket.delete_object(path).await?;
145        Ok(())
146    }
147
148    async fn write_file(
149        &self,
150        path: String,
151        byte_stream: impl Stream<Item = Result<Bytes>> + std::marker::Send + 'static,
152    ) -> Result<()> {
153        let path = format!("{}/{}", self.r2_prefix, path);
154
155        pin_mut!(byte_stream);
156        // Create a stream that converts `Result<u8, S3Error>` to `Result<Vec<u8>, S3Error>`
157        let byte_stream =
158            byte_stream.map(|bytes| bytes.map_err(|e| Error::new(ErrorKind::Other, e)));
159
160        let mut stream_reader = StreamReader {
161            stream: byte_stream,
162            byte_buffer: Vec::new(),
163        };
164        // Stream the bytes directly to S3 without collecting them in memory
165        put_object_stream_custom(&self.r2_bucket, &mut stream_reader, &path).await?;
166        Ok(())
167    }
168}
169
170pub struct FileSystemDirectoryApapter {
171    pub snapshot_dir: String,
172}
173
174impl FileSystemDirectoryApapter {
175    async fn read_file(&self, path: String) -> impl Stream<Item = Result<Bytes>> + Send {
176        let path = format!("{}/{}", self.snapshot_dir, path);
177        let file = OpenOptions::new().read(true).open(path).unwrap();
178        let bytes = BufReader::new(file).bytes();
179        stream! {
180            let mut byte_chunk = vec![];
181            for byte in bytes.into_iter() {
182                byte_chunk.push(byte.with_context(|| "Failed to read byte from file")?);
183                if byte_chunk.len() == CHUNK_SIZE {
184                    yield Ok(Bytes::from(byte_chunk.clone()));
185                    byte_chunk.clear();
186                }
187            }
188            if !byte_chunk.is_empty() {
189                yield Ok(Bytes::from(byte_chunk));
190            }
191        }
192    }
193
194    async fn list_files(&self) -> Result<Vec<String>> {
195        if !PathBuf::new().join(&self.snapshot_dir).exists() {
196            return Ok(Vec::new());
197        }
198        let files = fs::read_dir(&self.snapshot_dir)
199            .with_context(|| format!("Failed to read directory: {:?}", self.snapshot_dir))?;
200        let mut file_names = Vec::new();
201        for file in files {
202            let file = file?;
203            let file_name = file.file_name().into_string().unwrap();
204            file_names.push(file_name);
205        }
206        Ok(file_names)
207    }
208
209    async fn delete_file(&self, path: String) -> Result<()> {
210        let path = format!("{}/{}", self.snapshot_dir, path);
211        fs::remove_file(path.clone())
212            .with_context(|| format!("Failed to delete file: {:?}", path))?;
213        Ok(())
214    }
215
216    async fn write_file(
217        &self,
218        path: String,
219        bytes: impl Stream<Item = Result<Bytes>>,
220    ) -> Result<()> {
221        let (mut temp_file, temp_path) = create_temp_snapshot_file(&self.snapshot_dir);
222        pin_mut!(bytes);
223        while let Some(byte) = bytes.next().await {
224            let byte = byte?;
225            temp_file.write_all(&byte).unwrap();
226        }
227
228        // Create snapshot directory if it doesn't exist
229        if !PathBuf::new().join(&self.snapshot_dir).exists() {
230            fs::create_dir_all(&self.snapshot_dir).unwrap();
231        }
232        let path = format!("{}/{}", self.snapshot_dir, path);
233        fs::rename(temp_path.clone(), path.clone())
234            .with_context(|| format!("Failed to rename file: {:?} -> {:?}", temp_path, path))?;
235        Ok(())
236    }
237}
238
239/// Struct representing a directory adapter that can read and write files
240/// HACK: This should definitely be a trait, but we used a struct to get around some cryptic
241///       compiler errors
242pub struct DirectoryAdapter {
243    filesystem_directory_adapter: Option<Arc<FileSystemDirectoryApapter>>,
244    r2_directory_adapter: Option<Arc<R2DirectoryAdapter>>,
245}
246
247impl DirectoryAdapter {
248    pub fn new(
249        filesystem_directory_adapter: Option<FileSystemDirectoryApapter>,
250        r2_directory_adapter: Option<R2DirectoryAdapter>,
251    ) -> Self {
252        match (&filesystem_directory_adapter, &r2_directory_adapter) {
253            (Some(_snapshot_dir), None) => {}
254            (None, Some(_r2_bucket)) => {}
255            _ => panic!("Either snapshot_dir or r2_bucket must be provided"),
256        };
257
258        Self {
259            filesystem_directory_adapter: filesystem_directory_adapter.map(Arc::new),
260            r2_directory_adapter: r2_directory_adapter.map(Arc::new),
261        }
262    }
263
264    pub fn from_local_directory(snapshot_dir: String) -> Self {
265        Self::new(Some(FileSystemDirectoryApapter { snapshot_dir }), None)
266    }
267
268    pub async fn from_r2_bucket_and_prefix_and_env(r2_bucket: String, r2_prefix: String) -> Self {
269        // Get endpoint url, access key, region, and secret key from environment variables
270        let r2_credentials = Credentials::new(
271            Some(&std::env::var("R2_ACCESS_KEY").unwrap()),
272            Some(&std::env::var("R2_SECRET_KEY").unwrap()),
273            None,
274            None,
275            None,
276        )
277        .unwrap();
278        let r2_region = Region::R2 {
279            account_id: std::env::var("R2_ACCOUNT_ID").unwrap(),
280        };
281        let r2_bucket_args = R2BucketArgs {
282            r2_credentials,
283            r2_region,
284            r2_bucket,
285            create_bucket: false,
286        };
287        let r2_bucket = get_r2_bucket(r2_bucket_args).await;
288        Self::new(
289            None,
290            Some(R2DirectoryAdapter {
291                r2_bucket,
292                r2_prefix,
293            }),
294        )
295    }
296
297    /// Reads the contents of a file at the given path
298    async fn read_file(&self, path: String) -> impl Stream<Item = Result<Bytes>> + 'static {
299        let file_system_directory_adapter = self.filesystem_directory_adapter.clone();
300        let r2_directory_adapter = self.r2_directory_adapter.clone();
301        stream! {
302            if let Some(filesystem_directory_adapter) = file_system_directory_adapter {
303                let stream = filesystem_directory_adapter.read_file(path).await;
304                pin_mut!(stream);
305                while let Some(byte) = stream.next().await {
306                    yield byte;
307                }
308            } else if let Some(r2_directory_adapter) = r2_directory_adapter {
309                let stream = R2DirectoryAdapter::read_file(r2_directory_adapter, path).await;
310                pin_mut!(stream);
311                while let Some(byte) = stream.next().await {
312                    yield byte;
313                }
314            } else {
315                panic!("No directory adapter provided");
316            }
317        }
318    }
319
320    /// Writes data to a file at the given path
321    async fn list_files(&self) -> Result<Vec<String>> {
322        if let Some(filesystem_directory_adapter) = &self.filesystem_directory_adapter {
323            filesystem_directory_adapter.list_files().await
324        } else if let Some(r2_directory_adapter) = &self.r2_directory_adapter {
325            r2_directory_adapter.list_files().await
326        } else {
327            panic!("No directory adapter provided");
328        }
329    }
330
331    /// Deletes the file at the given path
332    pub async fn delete_file(&self, path: String) -> Result<()> {
333        if let Some(filesystem_directory_adapter) = &self.filesystem_directory_adapter {
334            filesystem_directory_adapter.delete_file(path).await
335        } else if let Some(r2_directory_adapter) = &self.r2_directory_adapter {
336            r2_directory_adapter.delete_file(path).await
337        } else {
338            panic!("No directory adapter provided");
339        }
340    }
341
342    /// Write file to the given path
343    async fn write_file(
344        &self,
345        path: String,
346        bytes: impl Stream<Item = Result<Bytes>> + std::marker::Send + 'static,
347    ) -> Result<()> {
348        if let Some(filesystem_directory_adapter) = &self.filesystem_directory_adapter {
349            filesystem_directory_adapter.write_file(path, bytes).await
350        } else if let Some(r2_directory_adapter) = &self.r2_directory_adapter {
351            r2_directory_adapter.write_file(path, bytes).await
352        } else {
353            panic!("No directory adapter provided");
354        }
355    }
356}
357
358fn is_compression_instruction(instruction: &Instruction) -> bool {
359    instruction.program_id == ACCOUNT_COMPRESSION_PROGRAM_ID
360        || instruction
361            .accounts
362            .contains(&ACCOUNT_COMPRESSION_PROGRAM_ID)
363}
364
365pub fn is_compression_transaction(tx: &TransactionInfo) -> bool {
366    for instruction_group in &tx.instruction_groups {
367        if is_compression_instruction(&instruction_group.outer_instruction) {
368            return true;
369        }
370        for instruction in &instruction_group.inner_instructions {
371            if is_compression_instruction(instruction) {
372                return true;
373            }
374        }
375    }
376    false
377}
378
379#[derive(Debug)]
380pub struct SnapshotFileWithSlots {
381    pub file: String,
382    pub start_slot: u64,
383    pub end_slot: u64,
384}
385
386pub async fn get_snapshot_files_with_metadata(
387    directory_adapter: &DirectoryAdapter,
388) -> anyhow::Result<Vec<SnapshotFileWithSlots>> {
389    let snapshot_files = directory_adapter.list_files().await?;
390    let mut snapshot_files_with_slots = Vec::new();
391
392    for file in snapshot_files {
393        // Make this return an error if file name is not in the expected format
394        let parts: Vec<&str> = file.split('-').collect();
395        if parts.len() == 3 {
396            let start_slot = parts[1].parse::<u64>()?;
397            let end_slot = parts[2].parse::<u64>()?;
398            snapshot_files_with_slots.push(SnapshotFileWithSlots {
399                file,
400                start_slot,
401                end_slot,
402            });
403        }
404    }
405    snapshot_files_with_slots.sort_by_key(|file| file.start_slot);
406    Ok(snapshot_files_with_slots)
407}
408
409fn create_temp_snapshot_file(dir: &str) -> (File, PathBuf) {
410    let temp_dir = temp_dir();
411    // Create a subdirectory for the snapshot files
412    let temp_dir = temp_dir.join(dir);
413    if !temp_dir.exists() {
414        fs::create_dir(&temp_dir).unwrap();
415    }
416    let random_number = rand::random::<u64>();
417    let temp_file_path = temp_dir.join(format!("temp-snapshot-{}", random_number));
418    if temp_file_path.exists() {
419        fs::remove_file(&temp_file_path).unwrap();
420    }
421    let temp_file = File::create(&temp_file_path).unwrap();
422    (temp_file, temp_file_path)
423}
424
425async fn merge_snapshots(directory_adapter: Arc<DirectoryAdapter>) {
426    let snapshot_files = get_snapshot_files_with_metadata(directory_adapter.as_ref())
427        .await
428        .unwrap();
429    let start_slot = snapshot_files.first().map(|file| file.start_slot).unwrap();
430    let end_slot = snapshot_files.last().map(|file| file.end_slot).unwrap();
431    info!(
432        "Merging snapshots from slot {} to slot {}",
433        start_slot, end_slot
434    );
435    let byte_stream = load_byte_stream_from_directory_adapter(directory_adapter.clone()).await;
436    create_snapshot_from_byte_stream(byte_stream, directory_adapter.as_ref())
437        .await
438        .unwrap();
439    for snapshot_file in snapshot_files {
440        directory_adapter
441            .delete_file(snapshot_file.file)
442            .await
443            .unwrap();
444    }
445}
446
447pub async fn update_snapshot(
448    directory_adapter: Arc<DirectoryAdapter>,
449    block_stream_config: BlockStreamConfig,
450    full_snapshot_interval_slots: u64,
451    incremental_snapshot_interval_slots: u64,
452) {
453    // Convert stream to iterator
454    let block_stream = block_stream_config.load_block_stream();
455    update_snapshot_helper(
456        directory_adapter,
457        block_stream,
458        block_stream_config.last_indexed_slot,
459        incremental_snapshot_interval_slots,
460        full_snapshot_interval_slots,
461    )
462    .await;
463}
464
465pub async fn update_snapshot_helper(
466    directory_adapter: Arc<DirectoryAdapter>,
467    blocks_stream: impl Stream<Item = Vec<BlockInfo>>,
468    last_indexed_slot: u64,
469    incremental_snapshot_interval_slots: u64,
470    full_snapshot_interval_slots: u64,
471) {
472    let snapshot_files = get_snapshot_files_with_metadata(directory_adapter.as_ref())
473        .await
474        .unwrap();
475
476    let mut last_full_snapshot_slot = snapshot_files
477        .first()
478        .map(|file| file.end_slot)
479        .unwrap_or(last_indexed_slot);
480    let mut last_snapshot_slot = snapshot_files
481        .last()
482        .map(|file| file.end_slot)
483        .unwrap_or(last_indexed_slot);
484
485    let mut byte_buffer = Vec::new();
486
487    pin_mut!(blocks_stream);
488    while let Some(blocks) = blocks_stream.next().await {
489        for block in blocks {
490            let slot = block.metadata.slot;
491            let write_full_snapshot = slot - last_full_snapshot_slot
492                + (last_indexed_slot == 0) as u64
493                >= full_snapshot_interval_slots;
494            let write_incremental_snapshot = slot - last_snapshot_slot
495                + (last_snapshot_slot == 0) as u64
496                >= incremental_snapshot_interval_slots;
497
498            let trimmed_block = BlockInfo {
499                metadata: block.metadata.clone(),
500                transactions: block
501                    .transactions
502                    .iter()
503                    .filter(|tx| is_compression_transaction(tx))
504                    .cloned()
505                    .collect(),
506            };
507            let block_bytes = bincode::serialize(&trimmed_block).unwrap();
508            byte_buffer.extend(block_bytes);
509
510            if write_incremental_snapshot {
511                let snapshot_file_path = format!("snapshot-{}-{}", last_snapshot_slot + 1, slot);
512                info!("Writing snapshot file: {}", snapshot_file_path);
513                let byte_buffer_clone = byte_buffer.clone();
514                let byte_stream = stream! {
515                    yield Ok(Bytes::from(byte_buffer_clone));
516                };
517                directory_adapter
518                    .as_ref()
519                    .write_file(snapshot_file_path, byte_stream)
520                    .await
521                    .unwrap();
522                byte_buffer.clear();
523                last_snapshot_slot = slot;
524            }
525            if write_full_snapshot {
526                merge_snapshots(directory_adapter.clone()).await;
527                last_full_snapshot_slot = slot;
528            }
529        }
530    }
531}
532
533pub async fn load_byte_stream_from_directory_adapter(
534    directory_adapter: Arc<DirectoryAdapter>,
535) -> impl Stream<Item = Result<Bytes>> + 'static {
536    // Create an asynchronous stream of bytes from the snapshot files
537    stream! {
538        let snapshot_files =
539            get_snapshot_files_with_metadata(directory_adapter.as_ref()).await.context("Failed to retrieve snapshot files")?;
540        if snapshot_files.is_empty() {
541            yield Err(anyhow!("No snapshot files found"));
542        }
543
544        // Yield the snapshot version byte
545        yield Ok(Bytes::from(vec![SNAPSHOT_VERSION]));
546
547        let start_slot = snapshot_files.first().map(|file| file.start_slot).unwrap();
548        let end_slot = snapshot_files.last().map(|file| file.end_slot).unwrap();
549
550        let start_slot = start_slot.to_le_bytes();
551        let end_slot = end_slot.to_le_bytes();
552        yield Ok(Bytes::from(start_slot.to_vec()));
553        yield Ok(Bytes::from(end_slot.to_vec()));
554
555        // Iterate over each snapshot file
556        for snapshot_file in snapshot_files {
557            // Use anyhow context to add more error information
558            let byte_stream = directory_adapter.read_file(snapshot_file.file.clone()).await;
559            pin_mut!(byte_stream);
560            while let Some(byte) = byte_stream.next().await {
561                yield byte;
562            }
563        }
564    }
565}
566
567pub async fn load_block_stream_from_directory_adapter(
568    directory_adapter: Arc<DirectoryAdapter>,
569) -> impl Stream<Item = Vec<BlockInfo>> {
570    stream! {
571        let byte_stream = load_byte_stream_from_directory_adapter(directory_adapter.clone()).await;
572        pin_mut!(byte_stream);
573        // Skip the snapshot version byte
574        let snapshot_version = byte_stream.next().await.unwrap().unwrap();
575        let snapshot_version = snapshot_version[0];
576
577
578        if snapshot_version != SNAPSHOT_VERSION {
579            panic!("Unsupported snapshot version: {}. Please upgrade Photon package", snapshot_version);
580        }
581        // Skip the start slot and end slot
582        for _ in 0..2 {
583            byte_stream.next().await.unwrap().unwrap();
584        }
585
586        let mut reader = Vec::new();
587        let mut index = 0;
588        let mut accumulated_blocks = Vec::new();
589        let mut accumulated_transactions = 0;
590
591        while let Some(bytes) = byte_stream.next().await {
592            let bytes = bytes.unwrap();
593            reader.extend(&bytes);
594            while reader.len() - index > CHUNK_SIZE {
595                let block: BlockInfo = bincode::deserialize(&reader[index..]).unwrap();
596                let size = bincode::serialized_size(&block).unwrap() as usize;
597                index += size;
598                accumulated_transactions += block.transactions.len();
599                accumulated_blocks.push(block);
600                if accumulated_transactions >= TRANSACTIONS_TO_ACCUMULATE {
601                    yield accumulated_blocks;
602                    accumulated_blocks = Vec::new();
603                    accumulated_transactions = 0;
604                }
605            }
606            if index > 0 {
607                reader.drain(..index);
608                index = 0;
609            }
610        }
611
612        while index < reader.len() {
613            let block: BlockInfo = bincode::deserialize(&reader[index..]).unwrap();
614            let size = bincode::serialized_size(&block).unwrap() as usize;
615            index += size;
616            accumulated_transactions += block.transactions.len();
617            accumulated_blocks.push(block);
618            if accumulated_transactions >= TRANSACTIONS_TO_ACCUMULATE {
619                yield accumulated_blocks;
620                accumulated_blocks = Vec::new();
621                accumulated_transactions = 0;
622            }
623        }
624
625        if !accumulated_blocks.is_empty() {
626            yield accumulated_blocks;
627        }
628    }
629}
630
631pub async fn create_snapshot_from_byte_stream(
632    byte_stream: impl Stream<Item = Result<Bytes, anyhow::Error>> + std::marker::Send + 'static,
633    directory_adapter: &DirectoryAdapter,
634) -> Result<()> {
635    // Skip snapshot version byte
636    let mut byte_stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>> =
637        Box::pin(byte_stream);
638
639    let mut byte_buffer = Vec::new();
640    while let Some(byte) = byte_stream.next().await {
641        let byte = byte?;
642        byte_buffer.extend(byte.iter().copied());
643        // 1 byte for version, 8 bytes for start slot, 8 bytes for end slot
644        if byte_buffer.len() > 17 {
645            break;
646        }
647    }
648    // Snapshot version is the first byte
649    let snapshot_version = byte_buffer.remove(0);
650
651    if snapshot_version != SNAPSHOT_VERSION {
652        panic!(
653            "Unsupported snapshot version: {}. Please upgrade Photon package",
654            snapshot_version
655        );
656    }
657    let start_slot_bytes: [u8; 8] = byte_buffer
658        .drain(..8)
659        .collect::<Vec<u8>>()
660        .try_into()
661        .unwrap();
662    let start_slot = u64::from_le_bytes(start_slot_bytes);
663    let end_slot_bytes: [u8; 8] = byte_buffer
664        .drain(..8)
665        .collect::<Vec<u8>>()
666        .try_into()
667        .unwrap();
668    let end_slot = u64::from_le_bytes(end_slot_bytes);
669    let snapshot_name = format!("snapshot-{}-{}", start_slot, end_slot);
670    info!("Creating snapshot: {}", snapshot_name);
671    let byte_stream = stream! {
672        yield Ok(Bytes::from(byte_buffer));
673        while let Some(byte) = byte_stream.next().await {
674            yield byte;
675        }
676    };
677    directory_adapter
678        .write_file(snapshot_name.clone(), byte_stream)
679        .await?;
680
681    info!("Snapshot downloaded successfully to {:?}", snapshot_name);
682    Ok(())
683}