fxsplit/lib.rs
1#![allow(clippy::while_let_on_iterator)]
2#![allow(clippy::doc_overindented_list_items)]
3
4//! Core module for splitting a .fa/.fq file into chunks
5//! Alejandro Gonzales-Irribarren, 2025
6//!
7//! This module contains the main function for splitting .fa/.fq files
8//! based on custom requirements in parallel.
9//!
10//! In short, the module accepts any type of .fa or .fq file
11//! and process the reads or sequences inside them in parallel
12//! when is possible. Compressed files are also accepted. The
13//! user has the ability to specify is the splitting process should
14//! be done based on specific chunk sizes or number of files, and
15//! the amount of parallelization that should be used in the process.
16
17use std::{
18 collections::HashSet,
19 fs::{create_dir_all, File},
20 io::{BufRead, BufReader, BufWriter, Read, Write},
21 os::unix::fs::symlink,
22 path::{Path, PathBuf},
23 sync::Arc,
24};
25
26use anyhow::Result;
27use flate2::read::MultiGzDecoder;
28use flate2::write::GzEncoder;
29use flate2::Compression;
30use memchr::memchr_iter;
31use memmap2::Mmap;
32use rayon::prelude::*;
33
34const FA_NEEDLE: u8 = b'>';
35const HEADER_SPLIT_WARN_THRESHOLD: usize = 100_000;
36
37pub mod cli;
38use cli::Args;
39
40/// Dispatches file processing based on its suffix.
41///
42/// This macro inspects the file name of the given `Path` and attempts to match
43/// its suffix against a predefined set of patterns. For each matched suffix,
44/// it executes the corresponding action. If no suffix matches, it logs an error
45/// and bails out, indicating an unrecognized file format.
46///
47/// This provides a convenient way to route different file types to specific
48/// processing functions, often based on common bioinformatics file extensions
49/// like `.fa.gz` or `.fastq`.
50///
51/// # Arguments
52///
53/// * `$file` - An expression that evaluates to a reference to a `Path` or `PathBuf`,
54/// representing the input file.
55/// * `$suffixes_and_actions` - A block containing `suffix => $action` pairs,
56/// where `suffix` is a literal string to match
57/// against the end of the file name, and `$action`
58/// is a block of code to execute if the suffix matches.
59///
60/// # Panics
61///
62/// This macro will `unwrap_or_default()` on `file_name().to_str()`, which
63/// means it expects valid UTF-8 file names for matching. Non-UTF-8 file names
64/// will result in an empty string for `f`, which might not match any suffix.
65///
66/// # Errors
67///
68/// Returns an `anyhow::Error` if no suffix matches the input file's name.
69///
70/// # Example
71///
72/// ```rust, ignore
73/// use std::path::PathBuf;
74/// use anyhow::Result;
75///
76/// // Assume these functions exist for the example
77/// fn process_fasta(path: &PathBuf) -> Result<()> { /* ... */ Ok(()) }
78/// fn process_fastq(path: &PathBuf) -> Result<()> { /* ... */ Ok(()) }
79///
80/// let my_file = PathBuf::from("data/sequences.fa.gz");
81///
82/// dispatch!(&my_file, {
83/// "fa.gz" => process_fasta(&my_file)?,
84/// "fq.gz" => process_fastq(&my_file)?,
85/// "fa" => process_fasta(&my_file)?,
86/// });
87/// ```
88#[macro_export]
89macro_rules! dispatch {
90 ($file:expr, { $($suffix:literal => $action:expr),* $(,)?}) => {{
91 let f = $file.file_name().and_then(|f| f.to_str()).unwrap_or_default();
92 let mut matched = false;
93 $(
94 if f.ends_with($suffix) {
95 matched = true;
96 $action
97 }
98 )*
99 if !matched {
100 dbg!(f);
101 anyhow::bail!("ERROR: unrecognized file format: {}", $file.display());
102 }
103 }};
104}
105
106/// Checks if a file path ends with a specific suffix.
107///
108/// This helper function checks if the file name portion of a path ends with the given suffix.
109/// It handles the case where the file name might not be valid UTF-8.
110///
111/// # Arguments
112///
113/// * `path` - The path to check
114/// * `suffix` - The suffix to look for (e.g., "fa.gz", "fasta")
115///
116/// # Returns
117///
118/// * `true` if the file name ends with the suffix, `false` otherwise
119///
120/// # Example
121///
122/// ```rust, ignore
123/// use std::path::Path;
124///
125/// let path = Path::new("data/sequences.fa.gz");
126/// assert!(path_ends_with(&path, "fa.gz"));
127/// ```
128fn path_ends_with(path: &Path, suffix: &str) -> bool {
129 path.file_name()
130 .and_then(|f| f.to_str())
131 .map(|name| name.ends_with(suffix))
132 .unwrap_or(false)
133}
134
135/// Determines the output file extension for non-gzipped FASTA files.
136///
137/// # Arguments
138///
139/// * `path` - The input file path
140///
141/// # Returns
142///
143/// * `"fasta"` if the path ends with "fasta", otherwise `"fa"`
144///
145/// # Example
146///
147/// ```rust, ignore
148/// use std::path::Path;
149///
150/// let ext = fasta_output_extension(&Path::new("sequences.fasta"));
151/// assert_eq!(ext, "fasta");
152/// ```
153fn fasta_output_extension(path: &Path) -> &'static str {
154 if path_ends_with(path, "fasta") {
155 "fasta"
156 } else {
157 "fa"
158 }
159}
160
161/// Determines the output file extension for gzipped FASTA files.
162///
163/// # Arguments
164///
165/// * `path` - The input file path
166///
167/// # Returns
168///
169/// * `"fasta.gz"` if the path ends with "fasta.gz", otherwise `"fa.gz"`
170///
171/// # Example
172///
173/// ```rust, ignore
174/// use std::path::Path;
175///
176/// let ext = fasta_gz_output_extension(&Path::new("sequences.fasta.gz"));
177/// assert_eq!(ext, "fasta.gz");
178/// ```
179fn fasta_gz_output_extension(path: &Path) -> &'static str {
180 if path_ends_with(path, "fasta.gz") {
181 "fasta.gz"
182 } else {
183 "fa.gz"
184 }
185}
186
187/// Checks if the input file is gzipped based on its extension.
188///
189/// # Arguments
190///
191/// * `path` - The input file path
192///
193/// # Returns
194///
195/// * `true` if the path ends with ".gz", `false` otherwise
196///
197/// # Example
198///
199/// ```rust, ignore
200/// use std::path::Path;
201///
202/// assert!(fastq_is_gz(&Path::new("sequences.fq.gz")));
203/// assert!(!fastq_is_gz(&Path::new("sequences.fq")));
204/// ```
205fn fastq_is_gz(path: &Path) -> bool {
206 path_ends_with(path, ".gz")
207}
208
209/// Finds all FASTA header positions in the given data.
210///
211/// Scans the byte array for the FASTA header marker (`>`) and returns
212/// the byte positions where headers start. Filters to ensure headers
213/// are at the start of the file or after a newline.
214///
215/// # Arguments
216///
217/// * `data` - The raw file content as bytes
218///
219/// # Returns
220///
221/// * A `Vec<usize>` containing byte positions of all FASTA headers
222///
223/// # Example
224///
225/// ```rust, ignore
226/// let data = b">header1\nACGT\n>header2\nTGCA";
227/// let headers = find_fasta_headers(data);
228/// assert_eq!(headers, vec![0, 12]);
229/// ```
230fn find_fasta_headers(data: &[u8]) -> Vec<usize> {
231 memchr_iter(FA_NEEDLE, data)
232 .filter(|&i| i == 0 || data.get(i - 1) == Some(&b'\n'))
233 .collect()
234}
235
236/// Extracts the FASTA record ID from a header line.
237///
238/// Parses the header starting at the given position and extracts the
239/// first token (typically the sequence ID before any whitespace).
240///
241/// # Arguments
242///
243/// * `data` - The raw file content as bytes
244/// * `header_start` - The byte position where the header starts
245///
246/// # Returns
247///
248/// * The extracted ID as a `String`, or "record" if extraction fails
249///
250/// # Example
251///
252/// ```rust, ignore
253/// let data = b">seq1 description here\nACGT";
254/// let id = extract_fasta_id(data, 0);
255/// assert_eq!(id, "seq1");
256/// ```
257fn extract_fasta_id(data: &[u8], header_start: usize) -> String {
258 let start = header_start.saturating_add(1);
259 if start >= data.len() {
260 return "record".to_string();
261 }
262
263 let line_end = data[start..]
264 .iter()
265 .position(|&b| b == b'\n')
266 .map(|i| start + i)
267 .unwrap_or(data.len());
268
269 let mut line = &data[start..line_end];
270 if line.last() == Some(&b'\r') {
271 line = &line[..line.len().saturating_sub(1)];
272 }
273
274 let token_end = line
275 .iter()
276 .position(|b| b.is_ascii_whitespace())
277 .unwrap_or(line.len());
278
279 let token = &line[..token_end];
280 String::from_utf8_lossy(token).to_string()
281}
282
283/// Sanitizes a FASTA ID for use as a filename.
284///
285/// Replaces non-alphanumeric characters (except underscore, hyphen, dot)
286/// with underscores. Collapses multiple consecutive underscores into one
287/// and trims leading/trailing underscores.
288///
289/// # Arguments
290///
291/// * `id` - The raw FASTA ID to sanitize
292///
293/// # Returns
294///
295/// * A sanitized string safe for use as a filename
296///
297/// # Example
298///
299/// ```rust, ignore
300/// let sanitized = sanitize_fasta_id("seq_1 description");
301/// assert_eq!(sanitized, "seq_1_description");
302/// ```
303fn sanitize_fasta_id(id: &str) -> String {
304 let mut out = String::with_capacity(id.len());
305 let mut prev_is_underscore = false;
306
307 for ch in id.chars() {
308 let mapped = if ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-' | '.') {
309 ch
310 } else {
311 '_'
312 };
313
314 if mapped == '_' {
315 if prev_is_underscore {
316 continue;
317 }
318 prev_is_underscore = true;
319 } else {
320 prev_is_underscore = false;
321 }
322
323 out.push(mapped);
324 }
325
326 let trimmed = out.trim_matches('_');
327 if trimmed.is_empty() {
328 "record".to_string()
329 } else {
330 trimmed.to_string()
331 }
332}
333
334/// Generates unique filenames for each FASTA header in the input.
335///
336/// Creates filenames based on sanitized FASTA IDs, handling duplicates
337/// by appending numeric suffixes. Each filename includes the specified
338/// suffix and file extension.
339///
340/// # Arguments
341///
342/// * `data` - The raw file content as bytes
343/// * `headers` - Vector of byte positions for each header
344/// * `suffix` - Optional suffix to append to filenames
345/// * `extension` - File extension to use (e.g., "fa", "fasta.gz")
346///
347/// # Returns
348///
349/// * A `Vec<String>` of unique filenames, one per header
350///
351/// # Example
352///
353/// ```rust, ignore
354/// let data = b">seq1\nACGT\n>seq2\nTGCA";
355/// let headers = vec![0, 12];
356/// let names = make_unique_header_filenames(data, &headers, "part", "fa");
357/// assert_eq!(names, vec!["seq1_part.fa", "seq2_part.fa"]);
358/// ```
359fn make_unique_header_filenames(
360 data: &[u8],
361 headers: &[usize],
362 suffix: &str,
363 extension: &str,
364) -> Vec<String> {
365 let mut used = HashSet::with_capacity(headers.len());
366 let mut names = Vec::with_capacity(headers.len());
367
368 for &header_start in headers {
369 let raw_id = extract_fasta_id(data, header_start);
370 let sanitized = sanitize_fasta_id(&raw_id);
371
372 let base_name = if suffix.is_empty() {
373 sanitized
374 } else {
375 format!("{sanitized}_{suffix}")
376 };
377
378 let mut candidate = base_name.clone();
379 let mut duplicate_idx = 1usize;
380 while used.contains(&candidate) {
381 candidate = format!("{base_name}_{duplicate_idx}");
382 duplicate_idx += 1;
383 }
384
385 used.insert(candidate.clone());
386 names.push(format!("{candidate}.{extension}"));
387 }
388
389 names
390}
391
392/// Logs a warning if the number of output files would be excessive.
393///
394/// Warns the user when splitting by header would create more than
395/// 100,000 output files, as this may cause filesystem issues.
396///
397/// # Arguments
398///
399/// * `num_headers` - The number of headers (and thus output files)
400/// * `input` - The input file path for context in the warning
401///
402/// # Example
403///
404/// ```rust, ignore
405/// use std::path::Path;
406///
407/// warn_if_many_header_outputs(150000, &Path::new("input.fa"));
408/// // Logs warning if logging is enabled
409/// ```
410fn warn_if_many_header_outputs(num_headers: usize, input: &Path) {
411 if num_headers > HEADER_SPLIT_WARN_THRESHOLD {
412 log::warn!(
413 "WARNING: input {} contains {} FASTA headers; --headers will create {} output files",
414 input.display(),
415 num_headers,
416 num_headers
417 );
418 }
419}
420
421/// Calculates byte regions for each chunk based on the split mode.
422///
423/// # Arguments
424///
425/// * `headers` - Vector of byte positions for each FASTA header
426/// * `data_len` - Total length of the data in bytes
427/// * `mode` - The splitting mode (ChunkSize, NumFiles, or FileHeader)
428///
429/// # Returns
430///
431/// * A `Result<Vec<ChunkRegion>>` containing start/end byte positions
432///
433/// # Errors
434///
435/// Returns an error if chunk size or number of files is 0
436///
437/// # Example
438///
439/// ```rust, ignore
440/// use crate::{SplitMode, ChunkRegion};
441///
442/// let headers = vec![0, 50, 100, 150];
443/// let regions = chunk_regions_for_mode(&headers, 200, SplitMode::ChunkSize(2)).unwrap();
444/// assert_eq!(regions.len(), 2);
445/// ```
446fn chunk_regions_for_mode(
447 headers: &[usize],
448 data_len: usize,
449 mode: SplitMode,
450) -> Result<Vec<ChunkRegion>> {
451 match mode {
452 SplitMode::ChunkSize(records_per_file) => {
453 if records_per_file == 0 {
454 anyhow::bail!("ERROR: --chunks must be greater than 0");
455 }
456
457 let nchunks = headers.len().div_ceil(records_per_file);
458
459 Ok((0..nchunks)
460 .map(|i| {
461 let start = *headers.get(i * records_per_file).unwrap_or(&data_len);
462 let end = *headers.get((i + 1) * records_per_file).unwrap_or(&data_len);
463 ChunkRegion { start, end }
464 })
465 .collect())
466 }
467 SplitMode::NumFiles(files) => {
468 if files == 0 {
469 anyhow::bail!("ERROR: --files must be greater than 0");
470 }
471
472 // let records_per_file = (headers.len() + files - 1) / files;
473 let records_per_file = headers.len().div_ceil(files);
474
475 Ok((0..files)
476 .map(|i| {
477 let start_idx = i * records_per_file;
478 let end_idx = ((i + 1) * records_per_file).min(headers.len());
479 let start = *headers.get(start_idx).unwrap_or(&data_len);
480 let end = *headers.get(end_idx).unwrap_or(&data_len);
481 ChunkRegion { start, end }
482 })
483 .collect())
484 }
485 SplitMode::FileHeader => Ok((0..headers.len())
486 .map(|i| {
487 let start = headers[i];
488 let end = *headers.get(i + 1).unwrap_or(&data_len);
489 ChunkRegion { start, end }
490 })
491 .collect()),
492 }
493}
494
495/// Splits large sequencing files (FASTA, FASTQ, gzipped versions) into smaller chunks or files.
496///
497/// This is the main entry point for the `iso-split` functionality. It parses
498/// command-line arguments, determines the input file type based on its extension,
499/// and dispatches to the appropriate splitting function (`split_fa`, `split_fa_gz`, `split_fq`).
500///
501/// The splitting logic depends on the `SplitMode` specified in the `Args`
502/// (either by a fixed number of records per chunk/file or by a target number of output files).
503///
504/// # Arguments
505///
506/// * `args` - A `Vec<String>` representing the command-line arguments passed to the program.
507///
508/// # Returns
509///
510/// * `Result<()>` - An `Ok(())` on successful completion, or an `anyhow::Error` if
511/// argument parsing fails, the file format is unrecognized, or
512/// any splitting operation encounters an error.
513///
514/// # Errors
515///
516/// * Returns an error if argument parsing (`cli::Args::from`) fails.
517/// * Returns an error if the input file's suffix is not recognized by the `dispatch!` macro.
518/// * Returns any error propagated from the called splitting functions (`split_fa`, etc.).
519///
520/// # Example
521///
522/// ```rust, ignore
523/// fn main() -> Result<()> {
524/// lib_iso_split(vec!["--file".to_string(), "input.fasta.gz".to_string(), "--chunk-size".to_string(), "1000".to_string()])
525/// Ok(())
526/// }
527/// ```
528pub fn lib_iso_split(args: Vec<String>) -> Result<()> {
529 let args = cli::Args::from(args);
530
531 dispatch!(&args.file, {
532 "fa.gz" => split_fa_gz(&args)?,
533 "fasta.gz" => split_fa_gz(&args)?,
534 "fq.gz" => split_fq(&args)?,
535 "fastq.gz" => split_fq(&args)?,
536 "fq" => split_fq(&args)?,
537 "fastq" => split_fq(&args)?,
538 "fa" => split_fa(&args)?,
539 "fasta" => split_fa(&args)?,
540 });
541
542 Ok(())
543}
544
545/// Splits a non-gzipped FASTA file into multiple smaller FASTA files.
546///
547/// This function reads a FASTA file, identifies the start positions of all records
548/// using `memchr_iter` to find `FA_NEEDLE` (typically `>`). It then divides the
549/// file's content (memory-mapped for efficiency) into chunks based on the
550/// specified `SplitMode` (either `ChunkSize` or `NumFiles`). Each chunk is then
551/// written to a new output file within the designated output directory, utilizing
552/// a Rayon thread pool for parallel processing.
553///
554/// # Arguments
555///
556/// * `args` - A reference to an `Args` struct containing the input file path,
557/// output directory, number of threads, splitting mode, and an optional suffix.
558///
559/// # Returns
560///
561/// * `Result<()>` - An `Ok(())` on successful completion, or an `anyhow::Error` if
562/// any operation (file opening, memory mapping, directory creation,
563/// writing to files, or thread pool building) fails.
564///
565/// # Errors
566///
567/// * Returns an error if the input FASTA file cannot be opened or memory-mapped.
568/// * Returns an error if no FASTA records are found in the input file.
569/// * Returns an error if the output directory cannot be created.
570/// * Returns an error if `SplitMode::NumFiles` is 0.
571/// * Returns any `std::io::Error` during file writing.
572///
573/// # Parallelism
574///
575/// This function uses `rayon` for parallel processing of chunks, improving
576/// performance for large files. The number of threads is configured via `args.threads`.
577///
578/// # Example
579///
580/// ```rust, ignore
581/// use anyhow::Result;
582/// use std::path::PathBuf;
583/// // Assuming Args and SplitMode are defined as in lib_iso_split example
584///
585/// fn main() -> Result<()> {
586/// let args = cli::Args {
587/// file: PathBuf::from("input.fa"),
588/// outdir: PathBuf::from("fa_chunks"),
589/// threads: 4,
590/// suffix: Some("part".to_string()),
591/// mode_chunk_size: Some(100), // Split into chunks of 100 records
592/// mode_num_files: None,
593/// };
594/// // split_fa(&args)?;
595/// println!("Successfully split FASTA file.");
596/// Ok(())
597/// }
598/// ```
599pub fn split_fa(args: &Args) -> Result<()> {
600 log::info!("INFO: running in FASTA mode with args: {:?}", &args);
601
602 let pool = rayon::ThreadPoolBuilder::new()
603 .num_threads(args.threads)
604 .build()?;
605
606 let file = File::open(&args.file)?;
607 let mmap = unsafe { Mmap::map(&file)? };
608 let data = Arc::new(mmap);
609
610 let header = find_fasta_headers(data.as_ref());
611 if header.is_empty() {
612 anyhow::bail!("ERROR: No FASTA records found");
613 }
614
615 let mode = args.mode()?;
616 let suffix = args.suffix.clone().unwrap_or_default();
617 let extension = fasta_output_extension(&args.file);
618 create_dir_all(&args.outdir)?;
619
620 if matches!(mode, SplitMode::FileHeader) {
621 warn_if_many_header_outputs(header.len(), &args.file);
622 let filenames = make_unique_header_filenames(data.as_ref(), &header, &suffix, extension);
623
624 pool.install(|| {
625 (0..header.len())
626 .into_par_iter()
627 .try_for_each(|i| -> Result<()> {
628 let start = header[i];
629 let end = *header.get(i + 1).unwrap_or(&data.len());
630 let output_file = PathBuf::from(&args.outdir).join(&filenames[i]);
631 let mut writer = BufWriter::new(File::create(output_file)?);
632 writer.write_all(&data[start..end])?;
633 writer.flush()?;
634 Ok(())
635 })
636 })?;
637
638 return Ok(());
639 }
640
641 let chunks = chunk_regions_for_mode(&header, data.len(), mode)?;
642 let prefix = match mode {
643 SplitMode::NumFiles(_) => "part",
644 _ => "chunk",
645 };
646
647 pool.install(|| {
648 chunks
649 .into_par_iter()
650 .enumerate()
651 .try_for_each(|(i, chunk)| -> Result<()> {
652 let output_file = PathBuf::from(&args.outdir)
653 .join(format!("tmp_{prefix}_{:03}_{suffix}.{extension}", i));
654 let mut writer = BufWriter::new(File::create(output_file)?);
655 writer.write_all(&data[chunk.start..chunk.end])?;
656 writer.flush()?;
657 Ok(())
658 })
659 })?;
660
661 Ok(())
662}
663
664/// Splits a gzipped FASTA file (`.fa.gz` or `.fasta.gz`) into multiple smaller gzipped FASTA files.
665///
666/// This function first decompresses the entire gzipped input file into memory.
667/// It then identifies the start positions of all FASTA records using `memchr_iter`.
668/// The decompressed data is divided into chunks based on the `SplitMode` (either
669/// `ChunkSize` for records per file or `NumFiles` for a target number of files).
670/// Each chunk is then individually compressed and written to a new gzipped output file
671/// within the specified output directory, leveraging a global Rayon thread pool for parallel execution.
672///
673/// A special case is handled: if the total number of records is less than or equal to
674/// the `records_per_file` when in `ChunkSize` mode, it creates a symlink to the original file
675/// instead of splitting, to avoid unnecessary work.
676///
677/// # Arguments
678///
679/// * `args` - A reference to an `Args` struct containing the input file path,
680/// output directory, number of threads, splitting mode, and an optional suffix.
681///
682/// # Returns
683///
684/// * `Result<()>` - An `Ok(())` on successful completion, or an `anyhow::Error` if
685/// any operation (file opening, decompression, directory creation,
686/// writing to files, or thread pool building) fails.
687///
688/// # Errors
689///
690/// * Returns an error if the input gzipped FASTA file cannot be opened or decompressed.
691/// * Returns an error if no FASTA records are found in the decompressed data.
692/// * Returns an error if the output directory cannot be created.
693/// * Returns an error if `SplitMode::NumFiles` is 0.
694/// * Returns any `std::io::Error` during file writing or compression.
695///
696/// # Parallelism
697///
698/// This function uses `rayon` for parallel processing of chunks, improving
699/// performance for large files. It builds a global thread pool with `args.threads`.
700///
701/// # Example
702///
703/// ```rust, ignore
704/// use anyhow::Result;
705/// use std::path::PathBuf;
706/// // Assuming Args and SplitMode are defined as in lib_iso_split example
707///
708/// fn main() -> Result<()> {
709/// let args = cli::Args {
710/// file: PathBuf::from("input.fa.gz"),
711/// outdir: PathBuf::from("fa_gz_chunks"),
712/// threads: 4,
713/// suffix: Some("part".to_string()),
714/// mode_num_files: Some(10), // Split into 10 output files
715/// mode_chunk_size: None,
716/// };
717/// // split_fa_gz(&args)?;
718/// println!("Successfully split gzipped FASTA file.");
719/// Ok(())
720/// }
721/// ```
722pub fn split_fa_gz(args: &Args) -> Result<()> {
723 log::info!("INFO: running in FASTA.GZ mode with args: {:?}", &args);
724
725 let pool = rayon::ThreadPoolBuilder::new()
726 .num_threads(args.threads)
727 .build()?;
728
729 let mut gz = MultiGzDecoder::new(File::open(&args.file)?);
730 let mut decompressed = Vec::new();
731 gz.read_to_end(&mut decompressed)?;
732 let data = Arc::new(decompressed);
733
734 let header = find_fasta_headers(data.as_ref());
735 log::info!(
736 "INFO: Found {} records in {}",
737 header.len(),
738 &args.file.display()
739 );
740
741 if header.is_empty() {
742 log::error!("ERROR: No FASTA records found in decompressed data");
743 anyhow::bail!("ERROR: No FASTA records found in decompressed data");
744 }
745
746 let mode = args.mode()?;
747 let suffix = args.suffix.clone().unwrap_or_default();
748 let extension = fasta_gz_output_extension(&args.file);
749 create_dir_all(&args.outdir)?;
750
751 if let SplitMode::ChunkSize(records_per_file) = mode {
752 if records_per_file == 0 {
753 anyhow::bail!("ERROR: --chunks must be greater than 0");
754 }
755
756 if header.len() <= records_per_file {
757 let outpath =
758 PathBuf::from(&args.outdir).join(format!("tmp_chunk_000_{suffix}.{extension}"));
759 std::fs::create_dir_all(&args.outdir)?;
760 log::warn!(
761 "Only {} records found, <= --chunks {}, creating symlink to original file...",
762 header.len(),
763 records_per_file
764 );
765
766 if args.copy {
767 std::fs::copy(&args.file, &outpath)?;
768 } else {
769 symlink(&args.file, &outpath)?;
770 }
771
772 return Ok(());
773 }
774 }
775
776 if matches!(mode, SplitMode::FileHeader) {
777 warn_if_many_header_outputs(header.len(), &args.file);
778 let filenames = make_unique_header_filenames(data.as_ref(), &header, &suffix, extension);
779
780 pool.install(|| {
781 (0..header.len())
782 .into_par_iter()
783 .try_for_each(|i| -> Result<()> {
784 let start = header[i];
785 let end = *header.get(i + 1).unwrap_or(&data.len());
786 let output_file = PathBuf::from(&args.outdir).join(&filenames[i]);
787 let file = File::create(output_file)?;
788 let writer = BufWriter::new(file);
789 let mut encoder =
790 flate2::write::GzEncoder::new(writer, flate2::Compression::fast());
791 encoder.write_all(&data[start..end])?;
792 encoder.finish()?;
793 Ok(())
794 })
795 })?;
796
797 return Ok(());
798 }
799
800 let chunks = chunk_regions_for_mode(&header, data.len(), mode)?;
801 let prefix = match mode {
802 SplitMode::NumFiles(_) => "part",
803 _ => "chunk",
804 };
805
806 pool.install(|| {
807 chunks
808 .into_par_iter()
809 .enumerate()
810 .try_for_each(|(i, chunk)| -> Result<()> {
811 let output_file = PathBuf::from(&args.outdir)
812 .join(format!("tmp_{prefix}_{:03}_{suffix}.{extension}", i));
813 let file = File::create(output_file)?;
814 let writer = BufWriter::new(file);
815 let mut encoder =
816 flate2::write::GzEncoder::new(writer, flate2::Compression::fast());
817 encoder.write_all(&data[chunk.start..chunk.end])?;
818 encoder.finish()?;
819 Ok(())
820 })
821 })?;
822
823 Ok(())
824}
825
826/// Split mode
827///
828/// Specifies how to split the input file:
829/// - `ChunkSize(n)`: Split into files with at most n records each
830/// - `NumFiles(n)`: Split into exactly n output files
831/// - `FileHeader`: Create one output file per FASTA record
832///
833/// # Arguments
834///
835/// * `ChunkSize(usize)` - Number of records per output file
836/// * `NumFiles(usize)` - Number of output files to create
837/// * `FileHeader` - Split by individual FASTA headers
838///
839/// # Example
840///
841/// ```rust, ignore
842/// use fxsplit::SplitMode;
843///
844/// let mode = SplitMode::ChunkSize(1000);
845/// let mode = SplitMode::NumFiles(10);
846/// let mode = SplitMode::FileHeader;
847/// ```
848#[derive(Debug, Clone, Copy)]
849pub enum SplitMode {
850 ChunkSize(usize), // N records per output file
851 NumFiles(usize), // K output files
852 FileHeader, // split by file header
853}
854
855// public structs
856/// Region [iso-split]
857///
858/// This enum is used to store region boundaries
859/// of a fq/fa file
860///
861/// # Example
862///
863/// ```rust, ignore
864/// use iso::Region;
865///
866/// let region = Region{start: 1, end: 43};
867/// assert_eq!(43, region.end);
868/// ```
869#[derive(Debug, Clone)]
870pub struct ChunkRegion {
871 pub start: usize,
872 pub end: usize,
873}
874
875/// Splits a gzipped FASTQ file (`.fq.gz` or `.fastq.gz`) into multiple smaller gzipped FASTQ files.
876///
877/// This function acts as a dispatcher for FASTQ splitting, determining the
878/// records-per-file based on the `SplitMode` (`ChunkSize` or `NumFiles`).
879/// If splitting by `NumFiles`, it first counts all records in the input file
880/// to calculate the appropriate `records_per_file` for even distribution.
881/// It then delegates the actual splitting and writing to `split_by_chunk_size`.
882///
883/// # Arguments
884///
885/// * `args` - A reference to an `Args` struct containing the input file path,
886/// output directory, number of threads (though not directly used here,
887/// passed to underlying functions), splitting mode, and an optional suffix.
888///
889/// # Returns
890///
891/// * `anyhow::Result<()>` - An `Ok(())` on successful completion, or an `anyhow::Error` if
892/// the splitting mode is invalid, record counting fails, or
893/// `split_by_chunk_size` encounters an error.
894///
895/// # Errors
896///
897/// * Returns an error if `args.mode()` returns an error (e.g., no split mode specified).
898/// * Returns an error if `count_fastq_records` fails.
899/// * Propagates any error from `split_by_chunk_size`.
900///
901/// # Example
902///
903/// ```rust, ignore
904/// use anyhow::Result;
905/// use std::path::PathBuf;
906/// // Assuming Args and SplitMode are defined as in lib_iso_split example
907///
908/// fn main() -> Result<()> {
909/// let args = cli::Args {
910/// file: PathBuf::from("input.fastq.gz"),
911/// outdir: PathBuf::from("fq_chunks"),
912/// threads: 4, // Not directly used in split_fq, but passed to helpers
913/// suffix: Some("batch".to_string()),
914/// mode_chunk_size: None,
915/// mode_num_files: Some(5), // Split into 5 output files
916/// };
917/// // split_fq(&args)?;
918/// println!("Successfully split FASTQ file.");
919/// Ok(())
920/// }
921/// ```
922pub fn split_fq(args: &Args) -> anyhow::Result<()> {
923 log::info!("INFO: running in FASTQ mode with args: {:?}", &args);
924
925 let mode = args.mode()?;
926 let suffix = args.suffix.clone().unwrap_or_default();
927
928 match mode {
929 SplitMode::ChunkSize(records_per_file) => {
930 if records_per_file == 0 {
931 anyhow::bail!("ERROR: --chunks must be greater than 0");
932 }
933 log::info!("INFO: splitting by chunk size!");
934 split_fastq_by_chunk_size(&args.file, &args.outdir, records_per_file, &suffix)
935 }
936 SplitMode::NumFiles(num_files) => {
937 if num_files == 0 {
938 anyhow::bail!("ERROR: --files must be greater than 0");
939 }
940 log::info!("INFO: splitting by number of files!");
941 let total_records = count_fastq_records(&args.file)?;
942 split_fastq_by_num_files(&args.file, &args.outdir, num_files, total_records, &suffix)
943 }
944 SplitMode::FileHeader => anyhow::bail!(
945 "ERROR: --headers is only supported for FASTA/FASTA.GZ files, not FASTQ/FASTQ.GZ"
946 ),
947 }
948}
949
950/// Represents a single FASTQ record.
951///
952/// # Fields
953///
954/// * `header` - The FASTQ header line (starts with @)
955/// * `seq` - The nucleotide sequence
956/// * `plus` - The plus line (usually just "+")
957/// * `qual` - Quality scores string
958///
959/// # Example
960///
961/// ```rust, ignore
962/// let record = FastqRecord {
963/// header: "@seq1".to_string(),
964/// seq: "ACGT".to_string(),
965/// plus: "+".to_string(),
966/// qual: "IIII".to_string(),
967/// };
968/// ```
969struct FastqRecord {
970 header: String,
971 seq: String,
972 plus: String,
973 qual: String,
974}
975
976/// Writer type for FASTQ output files.
977///
978/// Handles both plain and gzipped FASTQ file output.
979///
980/// # Variants
981///
982/// * `Plain` - Plain text FASTQ writer
983/// * `Gzip` - Gzipped FASTQ writer
984enum FastqWriter {
985 Plain(BufWriter<File>),
986 Gzip(BufWriter<GzEncoder<File>>),
987}
988
989impl FastqWriter {
990 /// Creates a new FASTQ writer for the specified file index.
991 ///
992 /// # Arguments
993 ///
994 /// * `out_dir` - Output directory path
995 /// * `index` - File index number (used in filename)
996 /// * `suffix` - Optional suffix for the filename
997 /// * `gzip_output` - Whether to use gzip compression
998 ///
999 /// # Returns
1000 ///
1001 /// * `anyhow::Result<Self>` - The created writer
1002 ///
1003 /// # Example
1004 ///
1005 /// ```rust, ignore
1006 /// use std::path::Path;
1007 ///
1008 /// let writer = FastqWriter::create(Path::new("output"), 0, "part", true).unwrap();
1009 /// ```
1010 fn create(
1011 out_dir: &Path,
1012 index: usize,
1013 suffix: &str,
1014 gzip_output: bool,
1015 ) -> anyhow::Result<Self> {
1016 let extension = if gzip_output { "fastq.gz" } else { "fastq" };
1017 let path = out_dir.join(format!("tmp_chunk_{index:03}_{suffix}.{extension}"));
1018 let file = File::create(path)?;
1019
1020 if gzip_output {
1021 let encoder = GzEncoder::new(file, Compression::fast());
1022 Ok(FastqWriter::Gzip(BufWriter::new(encoder)))
1023 } else {
1024 Ok(FastqWriter::Plain(BufWriter::new(file)))
1025 }
1026 }
1027
1028 /// Writes a single FASTQ record to the output.
1029 ///
1030 /// # Arguments
1031 ///
1032 /// * `record` - The FastqRecord to write
1033 ///
1034 /// # Returns
1035 ///
1036 /// * `anyhow::Result<()>` - Success or error
1037 ///
1038 /// # Example
1039 ///
1040 /// ```rust, ignore
1041 /// use crate::FastqRecord;
1042 ///
1043 /// let record = FastqRecord {
1044 /// header: "@seq1".to_string(),
1045 /// seq: "ACGT".to_string(),
1046 /// plus: "+".to_string(),
1047 /// qual: "IIII".to_string(),
1048 /// };
1049 /// // writer.write_record(&record).unwrap();
1050 /// ```
1051 fn write_record(&mut self, record: &FastqRecord) -> anyhow::Result<()> {
1052 match self {
1053 FastqWriter::Plain(writer) => {
1054 writer.write_all(record.header.as_bytes())?;
1055 writer.write_all(b"\n")?;
1056 writer.write_all(record.seq.as_bytes())?;
1057 writer.write_all(b"\n")?;
1058 writer.write_all(record.plus.as_bytes())?;
1059 writer.write_all(b"\n")?;
1060 writer.write_all(record.qual.as_bytes())?;
1061 writer.write_all(b"\n")?;
1062 Ok(())
1063 }
1064 FastqWriter::Gzip(writer) => {
1065 writer.write_all(record.header.as_bytes())?;
1066 writer.write_all(b"\n")?;
1067 writer.write_all(record.seq.as_bytes())?;
1068 writer.write_all(b"\n")?;
1069 writer.write_all(record.plus.as_bytes())?;
1070 writer.write_all(b"\n")?;
1071 writer.write_all(record.qual.as_bytes())?;
1072 writer.write_all(b"\n")?;
1073 Ok(())
1074 }
1075 }
1076 }
1077
1078 /// Finalizes and closes the writer, flushing any buffers.
1079 ///
1080 /// For gzip writers, also finishes the gzip stream.
1081 ///
1082 /// # Returns
1083 ///
1084 /// * `anyhow::Result<()>` - Success or error
1085 ///
1086 /// # Example
1087 ///
1088 /// ```rust, ignore
1089 /// // writer.finalize().unwrap();
1090 /// ```
1091 fn finalize(self) -> anyhow::Result<()> {
1092 match self {
1093 FastqWriter::Plain(mut writer) => {
1094 writer.flush()?;
1095 Ok(())
1096 }
1097 FastqWriter::Gzip(mut writer) => {
1098 writer.flush()?;
1099 let encoder = writer
1100 .into_inner()
1101 .map_err(|e| anyhow::anyhow!("ERROR: failed to flush gzip writer: {}", e))?;
1102 let _ = encoder.finish()?;
1103 Ok(())
1104 }
1105 }
1106 }
1107}
1108
1109/// Opens a FASTQ file for reading, handling gzip decompression.
1110///
1111/// # Arguments
1112///
1113/// * `input` - Path to the input FASTQ file
1114///
1115/// # Returns
1116///
1117/// * `anyhow::Result<Box<dyn BufRead>>` - A buffered reader
1118///
1119/// # Example
1120///
1121/// ```rust, ignore
1122/// use std::path::Path;
1123///
1124/// let reader = open_fastq_reader(Path::new("input.fq.gz")).unwrap();
1125/// ```
1126fn open_fastq_reader(input: &Path) -> anyhow::Result<Box<dyn BufRead>> {
1127 let file = File::open(input)?;
1128 if fastq_is_gz(input) {
1129 Ok(Box::new(BufReader::new(MultiGzDecoder::new(file))))
1130 } else {
1131 Ok(Box::new(BufReader::new(file)))
1132 }
1133}
1134
1135/// Reads the next FASTQ record from an iterator of lines.
1136///
1137/// # Arguments
1138///
1139/// * `lines` - Iterator of lines from the FASTQ file
1140///
1141/// # Returns
1142///
1143/// * `anyhow::Result<Option<FastqRecord>>` - The record or None at EOF
1144///
1145/// # Errors
1146///
1147/// Returns an error if the FASTQ format is malformed
1148///
1149/// # Example
1150///
1151/// ```rust, ignore
1152/// use std::io::Cursor;
1153///
1154/// let data = "@seq1\nACGT\n+\nIIII\n@seq2\nTGCA\n+\nJJJJ";
1155/// let mut lines = Cursor::new(data).lines();
1156/// let record = next_fastq_record(&mut lines).unwrap();
1157/// ```
1158fn next_fastq_record<I>(lines: &mut I) -> anyhow::Result<Option<FastqRecord>>
1159where
1160 I: Iterator<Item = std::io::Result<String>>,
1161{
1162 let header = match lines.next() {
1163 Some(h) => h?,
1164 None => return Ok(None),
1165 };
1166
1167 let seq = lines
1168 .next()
1169 .ok_or_else(|| anyhow::anyhow!("ERROR: unexpected EOF in FASTQ record"))??;
1170 let plus = lines
1171 .next()
1172 .ok_or_else(|| anyhow::anyhow!("ERROR: unexpected EOF in FASTQ record"))??;
1173 let qual = lines
1174 .next()
1175 .ok_or_else(|| anyhow::anyhow!("ERROR: unexpected EOF in FASTQ record"))??;
1176
1177 Ok(Some(FastqRecord {
1178 header,
1179 seq,
1180 plus,
1181 qual,
1182 }))
1183}
1184
1185/// Splits a FASTQ file into chunks of a fixed number of records.
1186///
1187/// # Arguments
1188///
1189/// * `input` - Path to the input FASTQ file
1190/// * `out_dir` - Output directory path
1191/// * `records_per_file` - Maximum records per output file
1192/// * `suffix` - Optional suffix for output filenames
1193///
1194/// # Returns
1195///
1196/// * `anyhow::Result<()>` - Success or error
1197///
1198/// # Example
1199///
1200/// ```rust, ignore
1201/// use std::path::Path;
1202///
1203/// split_fastq_by_chunk_size(
1204/// Path::new("input.fq.gz"),
1205/// Path::new("output"),
1206/// 1000,
1207/// "part"
1208/// ).unwrap();
1209/// ```
1210fn split_fastq_by_chunk_size(
1211 input: &Path,
1212 out_dir: &Path,
1213 records_per_file: usize,
1214 suffix: &str,
1215) -> anyhow::Result<()> {
1216 create_dir_all(out_dir)?;
1217
1218 let gzip_output = fastq_is_gz(input);
1219 let mut lines = open_fastq_reader(input)?.lines();
1220 let mut writer: Option<FastqWriter> = None;
1221 let mut file_index = 0usize;
1222 let mut record_index = 0usize;
1223
1224 while let Some(record) = next_fastq_record(&mut lines)? {
1225 if writer.is_none() {
1226 writer = Some(FastqWriter::create(
1227 out_dir,
1228 file_index,
1229 suffix,
1230 gzip_output,
1231 )?);
1232 } else if record_index == records_per_file {
1233 if let Some(current_writer) = writer.take() {
1234 current_writer.finalize()?;
1235 }
1236 file_index += 1;
1237 record_index = 0;
1238 writer = Some(FastqWriter::create(
1239 out_dir,
1240 file_index,
1241 suffix,
1242 gzip_output,
1243 )?);
1244 }
1245
1246 if let Some(current_writer) = writer.as_mut() {
1247 current_writer.write_record(&record)?;
1248 }
1249 record_index += 1;
1250 }
1251
1252 if let Some(current_writer) = writer {
1253 current_writer.finalize()?;
1254 }
1255
1256 Ok(())
1257}
1258
1259/// Splits a FASTQ file into a specified number of output files.
1260///
1261/// Distributes records as evenly as possible across the output files.
1262///
1263/// # Arguments
1264///
1265/// * `input` - Path to the input FASTQ file
1266/// * `out_dir` - Output directory path
1267/// * `num_files` - Number of output files to create
1268/// * `total_records` - Total number of records in the input
1269/// * `suffix` - Optional suffix for output filenames
1270///
1271/// # Returns
1272///
1273/// * `anyhow::Result<()>` - Success or error
1274///
1275/// # Example
1276///
1277/// ```rust, ignore
1278/// use std::path::Path;
1279///
1280/// split_fastq_by_num_files(
1281/// Path::new("input.fq.gz"),
1282/// Path::new("output"),
1283/// 10,
1284/// 5000,
1285/// "part"
1286/// ).unwrap();
1287/// ```
1288fn split_fastq_by_num_files(
1289 input: &Path,
1290 out_dir: &Path,
1291 num_files: usize,
1292 total_records: usize,
1293 suffix: &str,
1294) -> anyhow::Result<()> {
1295 create_dir_all(out_dir)?;
1296
1297 let gzip_output = fastq_is_gz(input);
1298 let mut lines = open_fastq_reader(input)?.lines();
1299
1300 let base_records = total_records / num_files;
1301 let remainder = total_records % num_files;
1302
1303 for file_idx in 0..num_files {
1304 let records_for_file = if file_idx < remainder {
1305 base_records + 1
1306 } else {
1307 base_records
1308 };
1309
1310 let mut writer = FastqWriter::create(out_dir, file_idx, suffix, gzip_output)?;
1311 for _ in 0..records_for_file {
1312 let record = next_fastq_record(&mut lines)?
1313 .ok_or_else(|| anyhow::anyhow!("ERROR: unexpected EOF while splitting FASTQ"))?;
1314 writer.write_record(&record)?;
1315 }
1316 writer.finalize()?;
1317 }
1318
1319 if next_fastq_record(&mut lines)?.is_some() {
1320 anyhow::bail!("ERROR: FASTQ reader still has remaining records after splitting");
1321 }
1322
1323 Ok(())
1324}
1325
1326/// Counts the total number of records in a FASTQ file.
1327///
1328/// FASTQ files have 4 lines per record, so this counts lines and divides by 4.
1329///
1330/// # Arguments
1331///
1332/// * `input` - Path to the input FASTQ file
1333///
1334/// # Returns
1335///
1336/// * `anyhow::Result<usize>` - The total number of records
1337///
1338/// # Errors
1339///
1340/// Returns an error if the file is malformed (line count not divisible by 4)
1341///
1342/// # Example
1343///
1344/// ```rust, ignore
1345/// use std::path::Path;
1346///
1347/// let count = count_fastq_records(Path::new("input.fq.gz")).unwrap();
1348/// ```
1349fn count_fastq_records(input: &Path) -> anyhow::Result<usize> {
1350 let mut lines = open_fastq_reader(input)?.lines();
1351 let mut line_count = 0usize;
1352
1353 while let Some(line) = lines.next() {
1354 line?;
1355 line_count += 1;
1356 }
1357
1358 if !line_count.is_multiple_of(4) {
1359 anyhow::bail!(
1360 "ERROR: malformed FASTQ input: line count {} is not divisible by 4",
1361 line_count
1362 );
1363 }
1364
1365 Ok(line_count / 4)
1366}