exon_cram/
object_store_fasta_repository_adapter.rs1use std::{collections::HashMap, sync::Arc};
16
17use noodles::fasta::{self, fai, repository::Adapter};
18use object_store::{path::Path, ObjectStore};
19
20pub struct ObjectStoreFastaRepositoryAdapter {
21 object_store: Arc<dyn ObjectStore>,
22 fasta_path: Path,
23 index_records: HashMap<String, fai::Record>,
24 internal_cache: HashMap<String, fasta::Record>,
25}
26
27impl ObjectStoreFastaRepositoryAdapter {
28 pub async fn try_new(
29 object_store: Arc<dyn ObjectStore>,
30 fasta_path: String,
31 ) -> std::io::Result<Self> {
32 let fasta_path = if fasta_path.starts_with("s3://") {
36 let mut parts = fasta_path.split('/');
37 let _ = parts.next();
38 let _ = parts.next();
39 let _bucket = parts.next().unwrap();
40 let key = parts.collect::<Vec<&str>>().join("/");
41
42 let key = key.trim_start_matches('/');
43 let key = key.trim_end_matches('/');
44
45 tracing::info!(key = &key, "Setting key to new key");
46 Path::parse(key)
47 } else {
48 Path::parse(fasta_path)
49 };
50
51 let fasta_path = match fasta_path {
52 Ok(path) => path,
53 Err(e) => {
54 return Err(std::io::Error::new(
55 std::io::ErrorKind::InvalidData,
56 format!("Invalid path: {}", e),
57 ))
58 }
59 };
60
61 let index_path_string = format!("{}.fai", fasta_path);
62 let index_path = Path::from(index_path_string);
63
64 let index_bytes = object_store.get(&index_path).await?.bytes().await?;
65 let index = fai::Reader::new(std::io::Cursor::new(index_bytes)).read_index()?;
66
67 let mut index_records = HashMap::new();
68
69 let records: Vec<fai::Record> = index.into();
70
71 for record in records {
72 let record_name = std::str::from_utf8(record.name()).map_err(|_| {
73 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid name")
74 })?;
75
76 index_records.insert(record_name.to_string(), record);
77 }
78
79 Ok(Self {
80 object_store,
81 fasta_path,
82 index_records,
83 internal_cache: HashMap::new(),
84 })
85 }
86}
87
88impl Adapter for ObjectStoreFastaRepositoryAdapter {
89 fn get(&mut self, name: &[u8]) -> Option<std::io::Result<noodles::fasta::Record>> {
90 let name = if let Ok(name) = std::str::from_utf8(name) {
91 name
92 } else {
93 return Some(Err(std::io::Error::new(
94 std::io::ErrorKind::InvalidData,
95 "Invalid name",
96 )));
97 };
98
99 if let Some(record) = self.internal_cache.get(name) {
100 return Some(Ok(record.clone()));
101 }
102
103 let _index_record = match self.index_records.get(name) {
104 Some(index_record) => index_record,
105 None => return None,
106 };
107
108 tracing::info!(
109 fasta_path = &self.fasta_path.to_string(),
110 "Fetching FASTA record for CRAM Adapter"
111 );
112
113 let sequence_bytes_result = futures::executor::block_on(async {
114 let byte_stream = self
115 .object_store
116 .get(&self.fasta_path)
117 .await?
118 .bytes()
119 .await?;
120
121 Ok(byte_stream)
122 });
123
124 let sequence_bytes = match sequence_bytes_result {
125 Ok(bytes) => bytes,
126 Err(e) => return Some(Err(e)),
127 };
128
129 let cursor = std::io::Cursor::new(sequence_bytes);
130 let mut fasta_reader = fasta::Reader::new(cursor);
131
132 let mut records = fasta_reader.records();
133 let record = records.next().unwrap().unwrap();
134
135 tracing::trace!(name = name, "Inserting record into cache");
136 self.internal_cache.insert(name.to_string(), record.clone());
137
138 Some(Ok(record))
139 }
140}