redicat 0.4.2

REDICAT - RNA Editing Cellular Assessment Toolkit: A highly parallelized utility for analyzing RNA editing events in single-cell RNA-seq data
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
//! Optimized reference genome operations with caching and better performance

use crate::core::error::{RedicatError, Result};
use bio::io::fasta::{Index, IndexedReader};
use log::{debug, info, warn};
use parking_lot::RwLock;
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;

/// Thread-safe reference genome accessor with caching
pub struct ReferenceGenome {
    reader: parking_lot::Mutex<IndexedReader<std::fs::File>>,
    sequences: Vec<String>,
    // Cache for frequently accessed positions
    cache: Arc<RwLock<HashMap<String, char>>>,
    cache_size_limit: usize,
}

unsafe impl Send for ReferenceGenome {}
unsafe impl Sync for ReferenceGenome {}

impl ReferenceGenome {
    /// Create a new ReferenceGenome instance with caching
    pub fn new(fasta_path: &str) -> Result<Self> {
        Self::new_with_cache_size(fasta_path, 100_000)
    }

    /// Create a new ReferenceGenome instance with specified cache size
    pub fn new_with_cache_size(fasta_path: &str, cache_size: usize) -> Result<Self> {
        let index_path = format!("{}.fai", fasta_path);

        if !std::path::Path::new(fasta_path).exists() {
            return Err(RedicatError::FileNotFound(format!(
                "FASTA file not found: {}",
                fasta_path
            )));
        }

        if !std::path::Path::new(&index_path).exists() {
            return Err(RedicatError::FileNotFound(format!(
                "FASTA index file not found: {}. Please create it using: samtools faidx {}",
                index_path, fasta_path
            )));
        }

        let index = Index::from_file(&index_path).map_err(|e| {
            RedicatError::ReferenceGenome(format!("Failed to load FASTA index: {:?}", e))
        })?;

        let reader = IndexedReader::with_index(
            std::fs::File::open(fasta_path).map_err(|e| {
                RedicatError::FileNotFound(format!("Cannot open FASTA file: {}", e))
            })?,
            index,
        );

        let sequences = Self::read_sequence_names(&index_path)?;

        info!(
            "Loaded reference genome with {} sequences, cache size: {}",
            sequences.len(),
            cache_size
        );

        Ok(ReferenceGenome {
            reader: parking_lot::Mutex::new(reader),
            sequences,
            cache: Arc::new(RwLock::new(HashMap::new())),
            cache_size_limit: cache_size,
        })
    }

    /// Get the reference base at a specific genomic position with caching
    pub fn get_ref_of_pos(&self, genomic_pos: &str) -> Result<char> {
        // Check cache first
        {
            let cache = self.cache.read();
            if let Some(&cached_base) = cache.get(genomic_pos) {
                return Ok(cached_base);
            }
        }

        // Parse genomic position
        let parts: Vec<&str> = genomic_pos.split(':').collect();
        if parts.len() != 2 {
            debug!("Invalid genomic position format: {}", genomic_pos);
            return Ok('N');
        }

        let chrom = parts[0];
        let pos: u64 = match parts[1].parse() {
            Ok(p) if p > 0 => p,
            _ => {
                debug!("Invalid position in {}: {}", genomic_pos, parts[1]);
                return Ok('N');
            }
        };

        // Validate chromosome exists
        if !self.sequences.iter().any(|seq| seq == chrom) {
            warn!("Chromosome {} not found in reference genome", chrom);
            return Ok('N');
        }

        // Fetch from file
        let base = self.fetch_base_from_file(chrom, pos)?;

        // Cache the result
        self.cache_base(genomic_pos.to_string(), base);

        Ok(base)
    }

    /// Fetch base from file (internal method)
    fn fetch_base_from_file(&self, chrom: &str, pos: u64) -> Result<char> {
        let mut reader = self.reader.lock();
        let mut sequence = Vec::new();

        match reader.fetch(chrom, pos - 1, pos) {
            Ok(_) => {
                if let Err(e) = reader.read(&mut sequence) {
                    debug!("Failed to read sequence at {}:{}: {:?}", chrom, pos, e);
                    return Ok('N');
                }

                if let Some(&base) = sequence.first() {
                    let base_char = (base as char).to_ascii_uppercase();
                    match base_char {
                        'A' | 'T' | 'C' | 'G' => Ok(base_char),
                        _ => {
                            debug!("Non-standard base at {}:{}: {}", chrom, pos, base_char);
                            Ok('N')
                        }
                    }
                } else {
                    debug!("Empty sequence at {}:{}", chrom, pos);
                    Ok('N')
                }
            }
            Err(e) => {
                debug!("Failed to fetch position {}:{}: {:?}", chrom, pos, e);
                Ok('N')
            }
        }
    }

    /// Cache a base with size management
    fn cache_base(&self, position: String, base: char) {
        let mut cache = self.cache.write();

        // Simple cache size management - remove oldest entries if at limit
        if cache.len() >= self.cache_size_limit {
            // Remove 10% of entries to make room
            let to_remove = self.cache_size_limit / 10;
            let keys_to_remove: Vec<String> = cache.keys().take(to_remove).cloned().collect();
            for key in keys_to_remove {
                cache.remove(&key);
            }
        }

        cache.insert(position, base);
    }

    /// Get reference bases for multiple genomic positions by batching reads per chromosome.
    ///
    /// Groups positions by chromosome, fetches each chromosome's relevant region
    /// once (single Mutex acquisition per chromosome), then maps positions to bases
    /// via direct array indexing. This dramatically reduces Mutex contention compared
    /// to per-position fetch calls.
    pub fn get_multiple_refs_batched(&self, positions: &[String]) -> Result<Vec<char>> {
        if positions.is_empty() {
            return Ok(Vec::new());
        }

        // Parse all positions into (chrom, 1-based pos) and collect by chromosome
        let mut chrom_groups: HashMap<String, Vec<(usize, u64)>> = HashMap::new();
        let mut results = vec!['N'; positions.len()];

        for (idx, pos_str) in positions.iter().enumerate() {
            let parts: Vec<&str> = pos_str.split(':').collect();
            if parts.len() != 2 {
                continue;
            }
            let chrom = parts[0].to_string();
            if let Ok(p) = parts[1].parse::<u64>() {
                if p > 0 {
                    chrom_groups.entry(chrom).or_default().push((idx, p));
                }
            }
        }

        // Process each chromosome: fetch the region [min, max] once
        for (chrom, group) in &chrom_groups {
            if !self.sequences.iter().any(|s| s == chrom) {
                continue;
            }

            let min_pos = group.iter().map(|&(_, p)| p).min().unwrap();
            let max_pos = group.iter().map(|&(_, p)| p).max().unwrap();

            // Fetch the region [min_pos-1, max_pos) (0-based half-open for bio crate)
            let mut seq = Vec::new();
            {
                let mut reader = self.reader.lock();
                if reader.fetch(chrom, min_pos - 1, max_pos).is_ok() {
                    let _ = reader.read(&mut seq);
                }
            }

            if seq.is_empty() {
                // Batch fetch failed (e.g. range exceeds chromosome length).
                // Fall back to per-position fetches for this chromosome.
                for &(result_idx, pos) in group {
                    results[result_idx] = self.fetch_base_from_file(chrom, pos).unwrap_or('N');
                }
                continue;
            }

            // Map each position to its base via direct offset
            for &(result_idx, pos) in group {
                let offset = (pos - min_pos) as usize;
                if offset < seq.len() {
                    let base_char = (seq[offset] as char).to_ascii_uppercase();
                    results[result_idx] = match base_char {
                        'A' | 'T' | 'C' | 'G' => base_char,
                        _ => 'N',
                    };
                }
            }

            // Cache the fetched bases
            for &(_, pos) in group {
                let offset = (pos - min_pos) as usize;
                if offset < seq.len() {
                    let base_char = (seq[offset] as char).to_ascii_uppercase();
                    let base = match base_char {
                        'A' | 'T' | 'C' | 'G' => base_char,
                        _ => 'N',
                    };
                    self.cache_base(format!("{}:{}", chrom, pos), base);
                }
            }
        }

        Ok(results)
    }

    /// Get reference bases for multiple genomic positions with parallel processing
    pub fn get_multiple_refs_parallel(&self, positions: &[String]) -> Result<Vec<char>> {
        if positions.is_empty() {
            return Ok(Vec::new());
        }

        // Use parallel processing for large batches
        const PARALLEL_THRESHOLD: usize = 100;

        if positions.len() < PARALLEL_THRESHOLD {
            // Serial processing for small batches
            let results = positions
                .iter()
                .map(|pos| self.get_ref_of_pos(pos).unwrap_or('N'))
                .collect::<Vec<_>>();
            Ok(results) // Fix: Remove .into()
        } else {
            // Parallel processing for large batches
            let results: Vec<char> = positions
                .par_iter()
                .map(|pos| self.get_ref_of_pos(pos).unwrap_or('N'))
                .collect();
            Ok(results)
        }
    }

    /// Get reference bases with chunked processing for very large datasets
    pub fn get_multiple_refs_chunked(
        &self,
        positions: &[String],
        chunk_size: usize,
    ) -> Result<Vec<char>> {
        if positions.is_empty() {
            return Ok(Vec::new());
        }

        let mut results = Vec::with_capacity(positions.len());

        for chunk in positions.chunks(chunk_size) {
            let chunk_results = self.get_multiple_refs_parallel(chunk)?;
            results.extend(chunk_results);
        }

        Ok(results)
    }

    /// Get the list of sequence names in the reference genome
    pub fn get_sequence_names(&self) -> &[String] {
        &self.sequences
    }

    /// Validate a genomic position and check if it has a standard base
    pub fn validate_position(&self, genomic_pos: &str) -> bool {
        match self.get_ref_of_pos(genomic_pos) {
            Ok(base) => base != 'N',
            Err(_) => false,
        }
    }

    /// Validate multiple positions in parallel
    pub fn validate_positions_parallel(&self, positions: &[String]) -> Vec<bool> {
        positions
            .par_iter()
            .map(|pos| self.validate_position(pos))
            .collect()
    }

    /// Get cache statistics for monitoring
    pub fn get_cache_stats(&self) -> (usize, usize, f64) {
        let cache = self.cache.read();
        let size = cache.len();
        let limit = self.cache_size_limit;
        let usage = size as f64 / limit as f64 * 100.0;
        (size, limit, usage)
    }

    /// Clear the cache
    pub fn clear_cache(&self) {
        let mut cache = self.cache.write();
        cache.clear();
        info!("Reference genome cache cleared");
    }

    /// Preload positions into cache
    pub fn preload_positions(&self, positions: &[String]) -> Result<()> {
        info!("Preloading {} positions into cache", positions.len());

        let _results: Vec<char> = positions
            .par_iter()
            .map(|pos| self.get_ref_of_pos(pos).unwrap_or('N'))
            .collect();

        let (cache_size, _, usage) = self.get_cache_stats();
        info!(
            "Preloading complete. Cache size: {}, usage: {:.1}%",
            cache_size, usage
        );

        Ok(())
    }

    /// Read sequence names from a FASTA index file
    fn read_sequence_names(index_path: &str) -> Result<Vec<String>> {
        use std::fs::File;
        use std::io::{BufRead, BufReader};

        let file = File::open(index_path).map_err(RedicatError::Io)?;

        let reader = BufReader::new(file);
        let sequences: Vec<String> = reader
            .lines()
            .map(|line| {
                line.map_err(RedicatError::Io).and_then(|l| {
                    let fields: Vec<&str> = l.split('\t').collect();
                    if fields.is_empty() {
                        Err(RedicatError::Parse("Empty line in FASTA index".to_string()))
                    } else {
                        Ok(fields[0].to_string())
                    }
                })
            })
            .collect::<Result<Vec<_>>>()?;

        if sequences.is_empty() {
            return Err(RedicatError::EmptyData(
                "No sequences found in FASTA index".to_string(),
            ));
        }

        Ok(sequences)
    }
}

#[cfg(test)]
mod tests {
    use super::ReferenceGenome;
    use tempfile::tempdir;

    fn create_indexed_fasta() -> std::path::PathBuf {
        let dir = tempdir().unwrap();
        let fasta_path = dir.path().join("ref.fa");
        let fai_path = dir.path().join("ref.fa.fai");

        std::fs::write(&fasta_path, b">chr1\nATCG\n>chr2\nGCTA\n").unwrap();
        std::fs::write(&fai_path, b"chr1\t4\t6\t4\t5\nchr2\t4\t17\t4\t5\n").unwrap();

        let persistent = dir.keep();
        persistent.join("ref.fa")
    }

    #[test]
    fn new_reports_missing_reference_files() {
        match ReferenceGenome::new("/definitely/not/exist.fa") {
            Ok(_) => panic!("expected missing file error"),
            Err(err) => assert!(format!("{}", err).contains("FASTA file not found")),
        }
    }

    #[test]
    fn get_ref_and_cache_stats_work() {
        let fasta = create_indexed_fasta();
        let rg = ReferenceGenome::new_with_cache_size(fasta.to_str().unwrap(), 16).unwrap();

        assert_eq!(rg.get_ref_of_pos("chr1:1").unwrap(), 'A');
        assert_eq!(rg.get_ref_of_pos("chr1:2").unwrap(), 'T');
        assert_eq!(rg.get_ref_of_pos("chr2:4").unwrap(), 'A');
        assert_eq!(rg.get_ref_of_pos("chr1:100").unwrap(), 'N');

        let (size, limit, usage) = rg.get_cache_stats();
        assert!(size >= 3);
        assert_eq!(limit, 16);
        assert!(usage >= 0.0);

        rg.clear_cache();
        let (size_after, _, _) = rg.get_cache_stats();
        assert_eq!(size_after, 0);
    }

    #[test]
    fn invalid_positions_return_n_and_validate_false() {
        let fasta = create_indexed_fasta();
        let rg = ReferenceGenome::new(fasta.to_str().unwrap()).unwrap();

        assert_eq!(rg.get_ref_of_pos("chr1").unwrap(), 'N');
        assert_eq!(rg.get_ref_of_pos("chr1:0").unwrap(), 'N');
        assert_eq!(rg.get_ref_of_pos("chr1:notanint").unwrap(), 'N');
        assert_eq!(rg.get_ref_of_pos("chr9:1").unwrap(), 'N');

        assert!(!rg.validate_position("chr1:notanint"));
        assert!(!rg.validate_position("chr9:1"));
    }

    #[test]
    fn parallel_and_chunked_match_serial() {
        let fasta = create_indexed_fasta();
        let rg = ReferenceGenome::new(fasta.to_str().unwrap()).unwrap();

        let mut positions = Vec::new();
        for i in 1..=60 {
            let p = ((i - 1) % 4) + 1;
            positions.push(format!("chr1:{}", p));
            positions.push(format!("chr2:{}", p));
        }

        let serial: Vec<char> = positions
            .iter()
            .map(|p| rg.get_ref_of_pos(p).unwrap())
            .collect();
        let parallel = rg.get_multiple_refs_parallel(&positions).unwrap();
        let chunked = rg.get_multiple_refs_chunked(&positions, 13).unwrap();
        let valid = rg.validate_positions_parallel(&positions);

        assert_eq!(serial, parallel);
        assert_eq!(serial, chunked);
        assert!(valid.iter().all(|&x| x));
    }

    #[test]
    fn batched_matches_serial() {
        let fasta = create_indexed_fasta();
        let rg = ReferenceGenome::new(fasta.to_str().unwrap()).unwrap();

        let positions: Vec<String> = vec![
            "chr1:1", "chr1:2", "chr1:3", "chr1:4",
            "chr2:1", "chr2:2", "chr2:3", "chr2:4",
            "chrX:1",      // unknown chrom → N
            "chr1:100",    // out of range → N
            "bad_format",  // no colon → N
        ].into_iter().map(String::from).collect();

        let serial: Vec<char> = positions
            .iter()
            .map(|p| rg.get_ref_of_pos(p).unwrap_or('N'))
            .collect();

        rg.clear_cache();
        let batched = rg.get_multiple_refs_batched(&positions).unwrap();

        assert_eq!(serial, batched);
    }

    #[test]
    fn batched_empty_input() {
        let fasta = create_indexed_fasta();
        let rg = ReferenceGenome::new(fasta.to_str().unwrap()).unwrap();
        let result = rg.get_multiple_refs_batched(&[]).unwrap();
        assert!(result.is_empty());
    }
}