exon_cram/
object_store_fasta_repository_adapter.rs

1// Copyright 2024 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, sync::Arc};
16
17use noodles::fasta::{self, fai, repository::Adapter};
18use object_store::{path::Path, ObjectStore};
19
20pub struct ObjectStoreFastaRepositoryAdapter {
21    object_store: Arc<dyn ObjectStore>,
22    fasta_path: Path,
23    index_records: HashMap<String, fai::Record>,
24    internal_cache: HashMap<String, fasta::Record>,
25}
26
27impl ObjectStoreFastaRepositoryAdapter {
28    pub async fn try_new(
29        object_store: Arc<dyn ObjectStore>,
30        fasta_path: String,
31    ) -> std::io::Result<Self> {
32        // if the fasta_path is on S3, we need to remove the s3:// and bucket
33        // prefix from the path
34
35        let fasta_path = if fasta_path.starts_with("s3://") {
36            let mut parts = fasta_path.split('/');
37            let _ = parts.next();
38            let _ = parts.next();
39            let _bucket = parts.next().unwrap();
40            let key = parts.collect::<Vec<&str>>().join("/");
41
42            let key = key.trim_start_matches('/');
43            let key = key.trim_end_matches('/');
44
45            tracing::info!(key = &key, "Setting key to new key");
46            Path::parse(key)
47        } else {
48            Path::parse(fasta_path)
49        };
50
51        let fasta_path = match fasta_path {
52            Ok(path) => path,
53            Err(e) => {
54                return Err(std::io::Error::new(
55                    std::io::ErrorKind::InvalidData,
56                    format!("Invalid path: {}", e),
57                ))
58            }
59        };
60
61        let index_path_string = format!("{}.fai", fasta_path);
62        let index_path = Path::from(index_path_string);
63
64        let index_bytes = object_store.get(&index_path).await?.bytes().await?;
65        let index = fai::Reader::new(std::io::Cursor::new(index_bytes)).read_index()?;
66
67        let mut index_records = HashMap::new();
68
69        let records: Vec<fai::Record> = index.into();
70
71        for record in records {
72            let record_name = std::str::from_utf8(record.name()).map_err(|_| {
73                std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid name")
74            })?;
75
76            index_records.insert(record_name.to_string(), record);
77        }
78
79        Ok(Self {
80            object_store,
81            fasta_path,
82            index_records,
83            internal_cache: HashMap::new(),
84        })
85    }
86}
87
88impl Adapter for ObjectStoreFastaRepositoryAdapter {
89    fn get(&mut self, name: &[u8]) -> Option<std::io::Result<noodles::fasta::Record>> {
90        let name = if let Ok(name) = std::str::from_utf8(name) {
91            name
92        } else {
93            return Some(Err(std::io::Error::new(
94                std::io::ErrorKind::InvalidData,
95                "Invalid name",
96            )));
97        };
98
99        if let Some(record) = self.internal_cache.get(name) {
100            return Some(Ok(record.clone()));
101        }
102
103        let _index_record = match self.index_records.get(name) {
104            Some(index_record) => index_record,
105            None => return None,
106        };
107
108        tracing::info!(
109            fasta_path = &self.fasta_path.to_string(),
110            "Fetching FASTA record for CRAM Adapter"
111        );
112
113        let sequence_bytes_result = futures::executor::block_on(async {
114            let byte_stream = self
115                .object_store
116                .get(&self.fasta_path)
117                .await?
118                .bytes()
119                .await?;
120
121            Ok(byte_stream)
122        });
123
124        let sequence_bytes = match sequence_bytes_result {
125            Ok(bytes) => bytes,
126            Err(e) => return Some(Err(e)),
127        };
128
129        let cursor = std::io::Cursor::new(sequence_bytes);
130        let mut fasta_reader = fasta::Reader::new(cursor);
131
132        let mut records = fasta_reader.records();
133        let record = records.next().unwrap().unwrap();
134
135        tracing::trace!(name = name, "Inserting record into cache");
136        self.internal_cache.insert(name.to_string(), record.clone());
137
138        Some(Ok(record))
139    }
140}