1use std::{
2 env::temp_dir,
3 fs::{self, File, OpenOptions},
4 io::{BufReader, Error, ErrorKind, Read, Write},
5 path::PathBuf,
6 pin::Pin,
7 sync::Arc,
8 task::Poll,
9};
10
11pub use crate::common::{
12 fetch_block_parent_slot, get_network_start_slot, setup_logging, setup_metrics, LoggingFormat,
13};
14use crate::ingester::{
15 fetchers::BlockStreamConfig,
16 parser::ACCOUNT_COMPRESSION_PROGRAM_ID,
17 typedefs::block_info::{BlockInfo, Instruction, TransactionInfo},
18};
19use anyhow::{anyhow, Context as AnyhowContext, Result};
20use async_stream::stream;
21use bytes::{BufMut, Bytes};
22use futures::stream::StreamExt;
23use futures::{pin_mut, stream, Stream};
24use log::info;
25use s3::creds::Credentials;
26use s3::region::Region;
27use s3::{bucket::Bucket, BucketConfiguration};
28use s3_utils::multipart_upload::put_object_stream_custom;
29use tokio::io::{AsyncRead, ReadBuf};
30pub mod s3_utils;
31
32pub const MEGABYTE: usize = 1024 * 1024;
33pub const CHUNK_SIZE: usize = 100 * 1024 * 1024;
34pub const TRANSACTIONS_TO_ACCUMULATE: usize = 5000;
36
37const SNAPSHOT_VERSION: u8 = 1;
38
39pub struct R2DirectoryAdapter {
40 pub r2_bucket: Bucket,
41 pub r2_prefix: String,
42}
43
44pub struct R2BucketArgs {
45 pub r2_credentials: Credentials,
46 pub r2_region: Region,
47 pub r2_bucket: String,
48 pub create_bucket: bool,
49}
50
51pub async fn get_r2_bucket(args: R2BucketArgs) -> Bucket {
52 let bucket = Bucket::new(
53 args.r2_bucket.as_str(),
54 args.r2_region.clone(),
55 args.r2_credentials.clone(),
56 )
57 .unwrap()
58 .with_path_style();
59 if args.create_bucket {
60 let bucket_exists = bucket.exists().await.unwrap();
62 if !bucket_exists {
63 Bucket::create_with_path_style(
64 args.r2_bucket.as_str(),
65 args.r2_region.clone(),
66 args.r2_credentials.clone(),
67 BucketConfiguration::default(),
68 )
69 .await
70 .unwrap();
71 }
72 }
73 bucket
74}
75
76struct StreamReader<S> {
77 stream: S,
78 byte_buffer: Vec<u8>,
79}
80
81impl<S> AsyncRead for StreamReader<S>
82where
83 S: stream::Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
84{
85 fn poll_read(
86 mut self: Pin<&mut Self>,
87 cx: &mut std::task::Context<'_>,
88 buf: &mut ReadBuf<'_>,
89 ) -> Poll<Result<(), std::io::Error>> {
90 if !self.byte_buffer.is_empty() {
91 let len = std::cmp::min(self.byte_buffer.len(), buf.remaining_mut());
92 buf.put_slice(&self.byte_buffer[..len]);
93 self.byte_buffer.drain(..len);
94 return Poll::Ready(Ok(()));
95 }
96 match futures::ready!(self.stream.poll_next_unpin(cx)) {
97 Some(Ok(chunk)) => {
98 self.byte_buffer.extend_from_slice(chunk.as_ref());
99 let len = std::cmp::min(self.byte_buffer.len(), buf.remaining_mut());
100 buf.put_slice(&self.byte_buffer[..len]);
101 self.byte_buffer.drain(..len);
102 Poll::Ready(Ok(()))
103 }
104 Some(Err(e)) => Poll::Ready(Err(e)),
105 None => Poll::Ready(Ok(())), }
107 }
108}
109
110impl R2DirectoryAdapter {
111 async fn read_file(
112 arc_self: Arc<Self>,
113 path: String,
114 ) -> impl Stream<Item = Result<Bytes>> + std::marker::Send + 'static {
115 stream! {
116 let r2_directory_adapter = arc_self.clone();
117 let mut result = r2_directory_adapter.r2_bucket.get_object_stream(path.clone()).await.with_context(|| format!("Failed to read file: {:?}", path))?;
118 let stream = result.bytes();
119
120 while let Some(byte) = stream.next().await {
121 let byte = byte.with_context(|| "Failed to read byte from file").unwrap();
122 yield Ok(byte);
123 }
124 }
125 }
126
127 async fn list_files(&self) -> Result<Vec<String>> {
128 let results = self
129 .r2_bucket
130 .list(self.r2_prefix.clone(), None)
131 .await
132 .unwrap_or_default();
133
134 let mut files = Vec::new();
135 for result in results {
136 for object in result.contents {
137 files.push(object.key);
138 }
139 }
140 Ok(files)
141 }
142
143 async fn delete_file(&self, path: String) -> Result<()> {
144 self.r2_bucket.delete_object(path).await?;
145 Ok(())
146 }
147
148 async fn write_file(
149 &self,
150 path: String,
151 byte_stream: impl Stream<Item = Result<Bytes>> + std::marker::Send + 'static,
152 ) -> Result<()> {
153 let path = format!("{}/{}", self.r2_prefix, path);
154
155 pin_mut!(byte_stream);
156 let byte_stream =
158 byte_stream.map(|bytes| bytes.map_err(|e| Error::new(ErrorKind::Other, e)));
159
160 let mut stream_reader = StreamReader {
161 stream: byte_stream,
162 byte_buffer: Vec::new(),
163 };
164 put_object_stream_custom(&self.r2_bucket, &mut stream_reader, &path).await?;
166 Ok(())
167 }
168}
169
170pub struct FileSystemDirectoryApapter {
171 pub snapshot_dir: String,
172}
173
174impl FileSystemDirectoryApapter {
175 async fn read_file(&self, path: String) -> impl Stream<Item = Result<Bytes>> + Send {
176 let path = format!("{}/{}", self.snapshot_dir, path);
177 let file = OpenOptions::new().read(true).open(path).unwrap();
178 let bytes = BufReader::new(file).bytes();
179 stream! {
180 let mut byte_chunk = vec![];
181 for byte in bytes.into_iter() {
182 byte_chunk.push(byte.with_context(|| "Failed to read byte from file")?);
183 if byte_chunk.len() == CHUNK_SIZE {
184 yield Ok(Bytes::from(byte_chunk.clone()));
185 byte_chunk.clear();
186 }
187 }
188 if !byte_chunk.is_empty() {
189 yield Ok(Bytes::from(byte_chunk));
190 }
191 }
192 }
193
194 async fn list_files(&self) -> Result<Vec<String>> {
195 if !PathBuf::new().join(&self.snapshot_dir).exists() {
196 return Ok(Vec::new());
197 }
198 let files = fs::read_dir(&self.snapshot_dir)
199 .with_context(|| format!("Failed to read directory: {:?}", self.snapshot_dir))?;
200 let mut file_names = Vec::new();
201 for file in files {
202 let file = file?;
203 let file_name = file.file_name().into_string().unwrap();
204 file_names.push(file_name);
205 }
206 Ok(file_names)
207 }
208
209 async fn delete_file(&self, path: String) -> Result<()> {
210 let path = format!("{}/{}", self.snapshot_dir, path);
211 fs::remove_file(path.clone())
212 .with_context(|| format!("Failed to delete file: {:?}", path))?;
213 Ok(())
214 }
215
216 async fn write_file(
217 &self,
218 path: String,
219 bytes: impl Stream<Item = Result<Bytes>>,
220 ) -> Result<()> {
221 let (mut temp_file, temp_path) = create_temp_snapshot_file(&self.snapshot_dir);
222 pin_mut!(bytes);
223 while let Some(byte) = bytes.next().await {
224 let byte = byte?;
225 temp_file.write_all(&byte).unwrap();
226 }
227
228 if !PathBuf::new().join(&self.snapshot_dir).exists() {
230 fs::create_dir_all(&self.snapshot_dir).unwrap();
231 }
232 let path = format!("{}/{}", self.snapshot_dir, path);
233 fs::rename(temp_path.clone(), path.clone())
234 .with_context(|| format!("Failed to rename file: {:?} -> {:?}", temp_path, path))?;
235 Ok(())
236 }
237}
238
239pub struct DirectoryAdapter {
243 filesystem_directory_adapter: Option<Arc<FileSystemDirectoryApapter>>,
244 r2_directory_adapter: Option<Arc<R2DirectoryAdapter>>,
245}
246
247impl DirectoryAdapter {
248 pub fn new(
249 filesystem_directory_adapter: Option<FileSystemDirectoryApapter>,
250 r2_directory_adapter: Option<R2DirectoryAdapter>,
251 ) -> Self {
252 match (&filesystem_directory_adapter, &r2_directory_adapter) {
253 (Some(_snapshot_dir), None) => {}
254 (None, Some(_r2_bucket)) => {}
255 _ => panic!("Either snapshot_dir or r2_bucket must be provided"),
256 };
257
258 Self {
259 filesystem_directory_adapter: filesystem_directory_adapter.map(Arc::new),
260 r2_directory_adapter: r2_directory_adapter.map(Arc::new),
261 }
262 }
263
264 pub fn from_local_directory(snapshot_dir: String) -> Self {
265 Self::new(Some(FileSystemDirectoryApapter { snapshot_dir }), None)
266 }
267
268 pub async fn from_r2_bucket_and_prefix_and_env(r2_bucket: String, r2_prefix: String) -> Self {
269 let r2_credentials = Credentials::new(
271 Some(&std::env::var("R2_ACCESS_KEY").unwrap()),
272 Some(&std::env::var("R2_SECRET_KEY").unwrap()),
273 None,
274 None,
275 None,
276 )
277 .unwrap();
278 let r2_region = Region::R2 {
279 account_id: std::env::var("R2_ACCOUNT_ID").unwrap(),
280 };
281 let r2_bucket_args = R2BucketArgs {
282 r2_credentials,
283 r2_region,
284 r2_bucket,
285 create_bucket: false,
286 };
287 let r2_bucket = get_r2_bucket(r2_bucket_args).await;
288 Self::new(
289 None,
290 Some(R2DirectoryAdapter {
291 r2_bucket,
292 r2_prefix,
293 }),
294 )
295 }
296
297 async fn read_file(&self, path: String) -> impl Stream<Item = Result<Bytes>> + 'static {
299 let file_system_directory_adapter = self.filesystem_directory_adapter.clone();
300 let r2_directory_adapter = self.r2_directory_adapter.clone();
301 stream! {
302 if let Some(filesystem_directory_adapter) = file_system_directory_adapter {
303 let stream = filesystem_directory_adapter.read_file(path).await;
304 pin_mut!(stream);
305 while let Some(byte) = stream.next().await {
306 yield byte;
307 }
308 } else if let Some(r2_directory_adapter) = r2_directory_adapter {
309 let stream = R2DirectoryAdapter::read_file(r2_directory_adapter, path).await;
310 pin_mut!(stream);
311 while let Some(byte) = stream.next().await {
312 yield byte;
313 }
314 } else {
315 panic!("No directory adapter provided");
316 }
317 }
318 }
319
320 async fn list_files(&self) -> Result<Vec<String>> {
322 if let Some(filesystem_directory_adapter) = &self.filesystem_directory_adapter {
323 filesystem_directory_adapter.list_files().await
324 } else if let Some(r2_directory_adapter) = &self.r2_directory_adapter {
325 r2_directory_adapter.list_files().await
326 } else {
327 panic!("No directory adapter provided");
328 }
329 }
330
331 pub async fn delete_file(&self, path: String) -> Result<()> {
333 if let Some(filesystem_directory_adapter) = &self.filesystem_directory_adapter {
334 filesystem_directory_adapter.delete_file(path).await
335 } else if let Some(r2_directory_adapter) = &self.r2_directory_adapter {
336 r2_directory_adapter.delete_file(path).await
337 } else {
338 panic!("No directory adapter provided");
339 }
340 }
341
342 async fn write_file(
344 &self,
345 path: String,
346 bytes: impl Stream<Item = Result<Bytes>> + std::marker::Send + 'static,
347 ) -> Result<()> {
348 if let Some(filesystem_directory_adapter) = &self.filesystem_directory_adapter {
349 filesystem_directory_adapter.write_file(path, bytes).await
350 } else if let Some(r2_directory_adapter) = &self.r2_directory_adapter {
351 r2_directory_adapter.write_file(path, bytes).await
352 } else {
353 panic!("No directory adapter provided");
354 }
355 }
356}
357
358fn is_compression_instruction(instruction: &Instruction) -> bool {
359 instruction.program_id == ACCOUNT_COMPRESSION_PROGRAM_ID
360 || instruction
361 .accounts
362 .contains(&ACCOUNT_COMPRESSION_PROGRAM_ID)
363}
364
365pub fn is_compression_transaction(tx: &TransactionInfo) -> bool {
366 for instruction_group in &tx.instruction_groups {
367 if is_compression_instruction(&instruction_group.outer_instruction) {
368 return true;
369 }
370 for instruction in &instruction_group.inner_instructions {
371 if is_compression_instruction(instruction) {
372 return true;
373 }
374 }
375 }
376 false
377}
378
379#[derive(Debug)]
380pub struct SnapshotFileWithSlots {
381 pub file: String,
382 pub start_slot: u64,
383 pub end_slot: u64,
384}
385
386pub async fn get_snapshot_files_with_metadata(
387 directory_adapter: &DirectoryAdapter,
388) -> anyhow::Result<Vec<SnapshotFileWithSlots>> {
389 let snapshot_files = directory_adapter.list_files().await?;
390 let mut snapshot_files_with_slots = Vec::new();
391
392 for file in snapshot_files {
393 let parts: Vec<&str> = file.split('-').collect();
395 if parts.len() == 3 {
396 let start_slot = parts[1].parse::<u64>()?;
397 let end_slot = parts[2].parse::<u64>()?;
398 snapshot_files_with_slots.push(SnapshotFileWithSlots {
399 file,
400 start_slot,
401 end_slot,
402 });
403 }
404 }
405 snapshot_files_with_slots.sort_by_key(|file| file.start_slot);
406 Ok(snapshot_files_with_slots)
407}
408
409fn create_temp_snapshot_file(dir: &str) -> (File, PathBuf) {
410 let temp_dir = temp_dir();
411 let temp_dir = temp_dir.join(dir);
413 if !temp_dir.exists() {
414 fs::create_dir(&temp_dir).unwrap();
415 }
416 let random_number = rand::random::<u64>();
417 let temp_file_path = temp_dir.join(format!("temp-snapshot-{}", random_number));
418 if temp_file_path.exists() {
419 fs::remove_file(&temp_file_path).unwrap();
420 }
421 let temp_file = File::create(&temp_file_path).unwrap();
422 (temp_file, temp_file_path)
423}
424
425async fn merge_snapshots(directory_adapter: Arc<DirectoryAdapter>) {
426 let snapshot_files = get_snapshot_files_with_metadata(directory_adapter.as_ref())
427 .await
428 .unwrap();
429 let start_slot = snapshot_files.first().map(|file| file.start_slot).unwrap();
430 let end_slot = snapshot_files.last().map(|file| file.end_slot).unwrap();
431 info!(
432 "Merging snapshots from slot {} to slot {}",
433 start_slot, end_slot
434 );
435 let byte_stream = load_byte_stream_from_directory_adapter(directory_adapter.clone()).await;
436 create_snapshot_from_byte_stream(byte_stream, directory_adapter.as_ref())
437 .await
438 .unwrap();
439 for snapshot_file in snapshot_files {
440 directory_adapter
441 .delete_file(snapshot_file.file)
442 .await
443 .unwrap();
444 }
445}
446
447pub async fn update_snapshot(
448 directory_adapter: Arc<DirectoryAdapter>,
449 block_stream_config: BlockStreamConfig,
450 full_snapshot_interval_slots: u64,
451 incremental_snapshot_interval_slots: u64,
452) {
453 let block_stream = block_stream_config.load_block_stream();
455 update_snapshot_helper(
456 directory_adapter,
457 block_stream,
458 block_stream_config.last_indexed_slot,
459 incremental_snapshot_interval_slots,
460 full_snapshot_interval_slots,
461 )
462 .await;
463}
464
465pub async fn update_snapshot_helper(
466 directory_adapter: Arc<DirectoryAdapter>,
467 blocks_stream: impl Stream<Item = Vec<BlockInfo>>,
468 last_indexed_slot: u64,
469 incremental_snapshot_interval_slots: u64,
470 full_snapshot_interval_slots: u64,
471) {
472 let snapshot_files = get_snapshot_files_with_metadata(directory_adapter.as_ref())
473 .await
474 .unwrap();
475
476 let mut last_full_snapshot_slot = snapshot_files
477 .first()
478 .map(|file| file.end_slot)
479 .unwrap_or(last_indexed_slot);
480 let mut last_snapshot_slot = snapshot_files
481 .last()
482 .map(|file| file.end_slot)
483 .unwrap_or(last_indexed_slot);
484
485 let mut byte_buffer = Vec::new();
486
487 pin_mut!(blocks_stream);
488 while let Some(blocks) = blocks_stream.next().await {
489 for block in blocks {
490 let slot = block.metadata.slot;
491 let write_full_snapshot = slot - last_full_snapshot_slot
492 + (last_indexed_slot == 0) as u64
493 >= full_snapshot_interval_slots;
494 let write_incremental_snapshot = slot - last_snapshot_slot
495 + (last_snapshot_slot == 0) as u64
496 >= incremental_snapshot_interval_slots;
497
498 let trimmed_block = BlockInfo {
499 metadata: block.metadata.clone(),
500 transactions: block
501 .transactions
502 .iter()
503 .filter(|tx| is_compression_transaction(tx))
504 .cloned()
505 .collect(),
506 };
507 let block_bytes = bincode::serialize(&trimmed_block).unwrap();
508 byte_buffer.extend(block_bytes);
509
510 if write_incremental_snapshot {
511 let snapshot_file_path = format!("snapshot-{}-{}", last_snapshot_slot + 1, slot);
512 info!("Writing snapshot file: {}", snapshot_file_path);
513 let byte_buffer_clone = byte_buffer.clone();
514 let byte_stream = stream! {
515 yield Ok(Bytes::from(byte_buffer_clone));
516 };
517 directory_adapter
518 .as_ref()
519 .write_file(snapshot_file_path, byte_stream)
520 .await
521 .unwrap();
522 byte_buffer.clear();
523 last_snapshot_slot = slot;
524 }
525 if write_full_snapshot {
526 merge_snapshots(directory_adapter.clone()).await;
527 last_full_snapshot_slot = slot;
528 }
529 }
530 }
531}
532
533pub async fn load_byte_stream_from_directory_adapter(
534 directory_adapter: Arc<DirectoryAdapter>,
535) -> impl Stream<Item = Result<Bytes>> + 'static {
536 stream! {
538 let snapshot_files =
539 get_snapshot_files_with_metadata(directory_adapter.as_ref()).await.context("Failed to retrieve snapshot files")?;
540 if snapshot_files.is_empty() {
541 yield Err(anyhow!("No snapshot files found"));
542 }
543
544 yield Ok(Bytes::from(vec![SNAPSHOT_VERSION]));
546
547 let start_slot = snapshot_files.first().map(|file| file.start_slot).unwrap();
548 let end_slot = snapshot_files.last().map(|file| file.end_slot).unwrap();
549
550 let start_slot = start_slot.to_le_bytes();
551 let end_slot = end_slot.to_le_bytes();
552 yield Ok(Bytes::from(start_slot.to_vec()));
553 yield Ok(Bytes::from(end_slot.to_vec()));
554
555 for snapshot_file in snapshot_files {
557 let byte_stream = directory_adapter.read_file(snapshot_file.file.clone()).await;
559 pin_mut!(byte_stream);
560 while let Some(byte) = byte_stream.next().await {
561 yield byte;
562 }
563 }
564 }
565}
566
567pub async fn load_block_stream_from_directory_adapter(
568 directory_adapter: Arc<DirectoryAdapter>,
569) -> impl Stream<Item = Vec<BlockInfo>> {
570 stream! {
571 let byte_stream = load_byte_stream_from_directory_adapter(directory_adapter.clone()).await;
572 pin_mut!(byte_stream);
573 let snapshot_version = byte_stream.next().await.unwrap().unwrap();
575 let snapshot_version = snapshot_version[0];
576
577
578 if snapshot_version != SNAPSHOT_VERSION {
579 panic!("Unsupported snapshot version: {}. Please upgrade Photon package", snapshot_version);
580 }
581 for _ in 0..2 {
583 byte_stream.next().await.unwrap().unwrap();
584 }
585
586 let mut reader = Vec::new();
587 let mut index = 0;
588 let mut accumulated_blocks = Vec::new();
589 let mut accumulated_transactions = 0;
590
591 while let Some(bytes) = byte_stream.next().await {
592 let bytes = bytes.unwrap();
593 reader.extend(&bytes);
594 while reader.len() - index > CHUNK_SIZE {
595 let block: BlockInfo = bincode::deserialize(&reader[index..]).unwrap();
596 let size = bincode::serialized_size(&block).unwrap() as usize;
597 index += size;
598 accumulated_transactions += block.transactions.len();
599 accumulated_blocks.push(block);
600 if accumulated_transactions >= TRANSACTIONS_TO_ACCUMULATE {
601 yield accumulated_blocks;
602 accumulated_blocks = Vec::new();
603 accumulated_transactions = 0;
604 }
605 }
606 if index > 0 {
607 reader.drain(..index);
608 index = 0;
609 }
610 }
611
612 while index < reader.len() {
613 let block: BlockInfo = bincode::deserialize(&reader[index..]).unwrap();
614 let size = bincode::serialized_size(&block).unwrap() as usize;
615 index += size;
616 accumulated_transactions += block.transactions.len();
617 accumulated_blocks.push(block);
618 if accumulated_transactions >= TRANSACTIONS_TO_ACCUMULATE {
619 yield accumulated_blocks;
620 accumulated_blocks = Vec::new();
621 accumulated_transactions = 0;
622 }
623 }
624
625 if !accumulated_blocks.is_empty() {
626 yield accumulated_blocks;
627 }
628 }
629}
630
631pub async fn create_snapshot_from_byte_stream(
632 byte_stream: impl Stream<Item = Result<Bytes, anyhow::Error>> + std::marker::Send + 'static,
633 directory_adapter: &DirectoryAdapter,
634) -> Result<()> {
635 let mut byte_stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>> =
637 Box::pin(byte_stream);
638
639 let mut byte_buffer = Vec::new();
640 while let Some(byte) = byte_stream.next().await {
641 let byte = byte?;
642 byte_buffer.extend(byte.iter().copied());
643 if byte_buffer.len() > 17 {
645 break;
646 }
647 }
648 let snapshot_version = byte_buffer.remove(0);
650
651 if snapshot_version != SNAPSHOT_VERSION {
652 panic!(
653 "Unsupported snapshot version: {}. Please upgrade Photon package",
654 snapshot_version
655 );
656 }
657 let start_slot_bytes: [u8; 8] = byte_buffer
658 .drain(..8)
659 .collect::<Vec<u8>>()
660 .try_into()
661 .unwrap();
662 let start_slot = u64::from_le_bytes(start_slot_bytes);
663 let end_slot_bytes: [u8; 8] = byte_buffer
664 .drain(..8)
665 .collect::<Vec<u8>>()
666 .try_into()
667 .unwrap();
668 let end_slot = u64::from_le_bytes(end_slot_bytes);
669 let snapshot_name = format!("snapshot-{}-{}", start_slot, end_slot);
670 info!("Creating snapshot: {}", snapshot_name);
671 let byte_stream = stream! {
672 yield Ok(Bytes::from(byte_buffer));
673 while let Some(byte) = byte_stream.next().await {
674 yield byte;
675 }
676 };
677 directory_adapter
678 .write_file(snapshot_name.clone(), byte_stream)
679 .await?;
680
681 info!("Snapshot downloaded successfully to {:?}", snapshot_name);
682 Ok(())
683}