Skip to main content

jam_rs/
io.rs

1use anyhow::Result;
2use std::fs::File;
3use std::io::{self, BufReader, BufWriter, Read, Write};
4use std::path::{Path, PathBuf};
5
6use crate::format::{ENTRY_SIZE, Entry};
7
8pub fn expand_input_paths(input_paths: &[PathBuf]) -> Result<Vec<PathBuf>> {
9    let mut expanded_paths = Vec::new();
10
11    for path in input_paths {
12        if path.is_dir() {
13            for entry in std::fs::read_dir(path)? {
14                let entry = entry?;
15                let file_path = entry.path();
16
17                if is_sequence_file(&file_path) {
18                    expanded_paths.push(file_path);
19                }
20            }
21        } else if path.is_file() {
22            if is_sequence_file(path) {
23                expanded_paths.push(path.clone());
24            } else {
25                let content = std::fs::read_to_string(path)?;
26                for line in content.lines() {
27                    let file_path = PathBuf::from(line.trim());
28                    if file_path.exists() && is_sequence_file(&file_path) {
29                        expanded_paths.push(file_path);
30                    }
31                }
32            }
33        }
34    }
35
36    if expanded_paths.is_empty() {
37        return Err(anyhow::anyhow!(
38            "No valid sequence files found in input paths"
39        ));
40    }
41
42    expanded_paths.sort();
43    Ok(expanded_paths)
44}
45
46pub fn is_sequence_file(path: &Path) -> bool {
47    if let Some(ext) = path.extension().map(|e| e.to_string_lossy().to_lowercase()) {
48        if ext == "gz"
49            && let Some(stem_ext) = path.file_stem().and_then(|s| Path::new(s).extension())
50        {
51            let stem_ext = stem_ext.to_string_lossy().to_lowercase();
52            return matches!(
53                stem_ext.as_str(),
54                "fasta" | "fa" | "fas" | "fna" | "fastq" | "fq"
55            );
56        }
57        return matches!(
58            ext.as_str(),
59            "fasta" | "fa" | "fas" | "fna" | "fastq" | "fq"
60        );
61    }
62    false
63}
64
65pub fn read_entries<P: AsRef<Path>>(path: P) -> io::Result<Vec<Entry>> {
66    let file = File::open(path)?;
67    let file_size = file.metadata()?.len() as usize;
68    let entry_count = file_size / ENTRY_SIZE;
69
70    let mut reader = BufReader::with_capacity(8 * 1024 * 1024, file);
71    let mut entries = Vec::with_capacity(entry_count);
72
73    let mut buf = [0u8; ENTRY_SIZE];
74    while reader.read_exact(&mut buf).is_ok() {
75        let hash = u64::from_le_bytes(buf[0..8].try_into().unwrap());
76        let sample_id = u32::from_le_bytes(buf[8..12].try_into().unwrap());
77        entries.push(Entry::new(hash, sample_id));
78    }
79
80    Ok(entries)
81}
82
83pub fn write_entries<P: AsRef<Path>>(path: P, entries: &[Entry]) -> io::Result<()> {
84    let file = File::create(path)?;
85    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file);
86    writer.write_all(bytemuck::cast_slice(entries))?;
87    writer.flush()
88}
89
90pub struct EntryWriter {
91    writer: BufWriter<File>,
92    count: u64,
93}
94
95impl EntryWriter {
96    pub fn new<P: AsRef<Path>>(path: P, buffer_size: usize) -> io::Result<Self> {
97        let file = File::create(path)?;
98        Ok(Self {
99            writer: BufWriter::with_capacity(buffer_size, file),
100            count: 0,
101        })
102    }
103
104    pub fn write(&mut self, entry: &Entry) -> io::Result<()> {
105        self.writer.write_all(bytemuck::bytes_of(entry))?;
106        self.count += 1;
107        Ok(())
108    }
109
110    pub fn write_batch(&mut self, entries: &[Entry]) -> io::Result<()> {
111        self.writer.write_all(bytemuck::cast_slice(entries))?;
112        self.count += entries.len() as u64;
113        Ok(())
114    }
115
116    pub fn count(&self) -> u64 {
117        self.count
118    }
119
120    pub fn flush(&mut self) -> io::Result<()> {
121        self.writer.flush()
122    }
123}
124
125pub fn extract_unique_hashes(entries: &[Entry]) -> Vec<u64> {
126    let mut unique = Vec::with_capacity(entries.len() / 10);
127    let mut last_hash: Option<u64> = None;
128
129    for entry in entries {
130        let h = entry.hash;
131        if last_hash != Some(h) {
132            unique.push(h);
133            last_hash = Some(h);
134        }
135    }
136
137    unique
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use tempfile::tempdir;
144
145    #[test]
146    fn test_entry_roundtrip() {
147        let dir = tempdir().unwrap();
148        let path = dir.path().join("test.bin");
149
150        let entries = vec![Entry::new(100, 1), Entry::new(200, 2), Entry::new(300, 3)];
151
152        write_entries(&path, &entries).unwrap();
153        let loaded = read_entries(&path).unwrap();
154        assert_eq!(entries, loaded);
155    }
156
157    #[test]
158    fn test_extract_unique_hashes() {
159        let entries = vec![
160            Entry::new(100, 1),
161            Entry::new(100, 2),
162            Entry::new(100, 3),
163            Entry::new(200, 1),
164            Entry::new(300, 1),
165            Entry::new(300, 2),
166        ];
167
168        let unique = extract_unique_hashes(&entries);
169        assert_eq!(unique, vec![100, 200, 300]);
170    }
171
172    #[test]
173    fn test_entry_writer() {
174        let dir = tempdir().unwrap();
175        let path = dir.path().join("test.bin");
176
177        let mut writer = EntryWriter::new(&path, 4096).unwrap();
178        writer.write(&Entry::new(10, 1)).unwrap();
179        writer.write(&Entry::new(20, 2)).unwrap();
180        writer.flush().unwrap();
181
182        assert_eq!(writer.count(), 2);
183
184        let loaded = read_entries(&path).unwrap();
185        assert_eq!(loaded, vec![Entry::new(10, 1), Entry::new(20, 2)]);
186    }
187}