Skip to main content

fxsplit/
lib.rs

1#![allow(clippy::while_let_on_iterator)]
2#![allow(clippy::doc_overindented_list_items)]
3
4//! Core module for splitting a .fa/.fq file into chunks
5//! Alejandro Gonzales-Irribarren, 2025
6//!
7//! This module contains the main function for splitting .fa/.fq files
8//! based on custom requirements in parallel.
9//!
10//! In short, the module accepts any type of .fa or .fq file
11//! and process the reads or sequences inside them in parallel
12//! when is possible. Compressed files are also accepted. The
13//! user has the ability to specify is the splitting process should
14//! be done based on specific chunk sizes or number of files, and
15//! the amount of parallelization that should be used in the process.
16
17use std::{
18    collections::HashSet,
19    fs::{create_dir_all, File},
20    io::{BufRead, BufReader, BufWriter, Read, Write},
21    os::unix::fs::symlink,
22    path::{Path, PathBuf},
23    sync::Arc,
24};
25
26use anyhow::Result;
27use flate2::read::MultiGzDecoder;
28use flate2::write::GzEncoder;
29use flate2::Compression;
30use memchr::memchr_iter;
31use memmap2::Mmap;
32use rayon::prelude::*;
33
34const FA_NEEDLE: u8 = b'>';
35const HEADER_SPLIT_WARN_THRESHOLD: usize = 100_000;
36
37pub mod cli;
38use cli::Args;
39
40/// Dispatches file processing based on its suffix.
41///
42/// This macro inspects the file name of the given `Path` and attempts to match
43/// its suffix against a predefined set of patterns. For each matched suffix,
44/// it executes the corresponding action. If no suffix matches, it logs an error
45/// and bails out, indicating an unrecognized file format.
46///
47/// This provides a convenient way to route different file types to specific
48/// processing functions, often based on common bioinformatics file extensions
49/// like `.fa.gz` or `.fastq`.
50///
51/// # Arguments
52///
53/// * `$file` - An expression that evaluates to a reference to a `Path` or `PathBuf`,
54///             representing the input file.
55/// * `$suffixes_and_actions` - A block containing `suffix => $action` pairs,
56///                             where `suffix` is a literal string to match
57///                             against the end of the file name, and `$action`
58///                             is a block of code to execute if the suffix matches.
59///
60/// # Panics
61///
62/// This macro will `unwrap_or_default()` on `file_name().to_str()`, which
63/// means it expects valid UTF-8 file names for matching. Non-UTF-8 file names
64/// will result in an empty string for `f`, which might not match any suffix.
65///
66/// # Errors
67///
68/// Returns an `anyhow::Error` if no suffix matches the input file's name.
69///
70/// # Example
71///
72/// ```rust, ignore
73/// use std::path::PathBuf;
74/// use anyhow::Result;
75///
76/// // Assume these functions exist for the example
77/// fn process_fasta(path: &PathBuf) -> Result<()> { /* ... */ Ok(()) }
78/// fn process_fastq(path: &PathBuf) -> Result<()> { /* ... */ Ok(()) }
79///
80/// let my_file = PathBuf::from("data/sequences.fa.gz");
81///
82/// dispatch!(&my_file, {
83///     "fa.gz" => process_fasta(&my_file)?,
84///     "fq.gz" => process_fastq(&my_file)?,
85///     "fa" => process_fasta(&my_file)?,
86/// });
87/// ```
88#[macro_export]
89macro_rules! dispatch {
90    ($file:expr, { $($suffix:literal => $action:expr),* $(,)?}) => {{
91        let f = $file.file_name().and_then(|f| f.to_str()).unwrap_or_default();
92        let mut matched = false;
93        $(
94            if f.ends_with($suffix) {
95                matched = true;
96                $action
97            }
98        )*
99        if !matched {
100            dbg!(f);
101            anyhow::bail!("ERROR: unrecognized file format: {}", $file.display());
102        }
103    }};
104}
105
106/// Checks if a file path ends with a specific suffix.
107///
108/// This helper function checks if the file name portion of a path ends with the given suffix.
109/// It handles the case where the file name might not be valid UTF-8.
110///
111/// # Arguments
112///
113/// * `path` - The path to check
114/// * `suffix` - The suffix to look for (e.g., "fa.gz", "fasta")
115///
116/// # Returns
117///
118/// * `true` if the file name ends with the suffix, `false` otherwise
119///
120/// # Example
121///
122/// ```rust, ignore
123/// use std::path::Path;
124///
125/// let path = Path::new("data/sequences.fa.gz");
126/// assert!(path_ends_with(&path, "fa.gz"));
127/// ```
128fn path_ends_with(path: &Path, suffix: &str) -> bool {
129    path.file_name()
130        .and_then(|f| f.to_str())
131        .map(|name| name.ends_with(suffix))
132        .unwrap_or(false)
133}
134
135/// Determines the output file extension for non-gzipped FASTA files.
136///
137/// # Arguments
138///
139/// * `path` - The input file path
140///
141/// # Returns
142///
143/// * `"fasta"` if the path ends with "fasta", otherwise `"fa"`
144///
145/// # Example
146///
147/// ```rust, ignore
148/// use std::path::Path;
149///
150/// let ext = fasta_output_extension(&Path::new("sequences.fasta"));
151/// assert_eq!(ext, "fasta");
152/// ```
153fn fasta_output_extension(path: &Path) -> &'static str {
154    if path_ends_with(path, "fasta") {
155        "fasta"
156    } else {
157        "fa"
158    }
159}
160
161/// Determines the output file extension for gzipped FASTA files.
162///
163/// # Arguments
164///
165/// * `path` - The input file path
166///
167/// # Returns
168///
169/// * `"fasta.gz"` if the path ends with "fasta.gz", otherwise `"fa.gz"`
170///
171/// # Example
172///
173/// ```rust, ignore
174/// use std::path::Path;
175///
176/// let ext = fasta_gz_output_extension(&Path::new("sequences.fasta.gz"));
177/// assert_eq!(ext, "fasta.gz");
178/// ```
179fn fasta_gz_output_extension(path: &Path) -> &'static str {
180    if path_ends_with(path, "fasta.gz") {
181        "fasta.gz"
182    } else {
183        "fa.gz"
184    }
185}
186
187/// Checks if the input file is gzipped based on its extension.
188///
189/// # Arguments
190///
191/// * `path` - The input file path
192///
193/// # Returns
194///
195/// * `true` if the path ends with ".gz", `false` otherwise
196///
197/// # Example
198///
199/// ```rust, ignore
200/// use std::path::Path;
201///
202/// assert!(fastq_is_gz(&Path::new("sequences.fq.gz")));
203/// assert!(!fastq_is_gz(&Path::new("sequences.fq")));
204/// ```
205fn fastq_is_gz(path: &Path) -> bool {
206    path_ends_with(path, ".gz")
207}
208
209/// Finds all FASTA header positions in the given data.
210///
211/// Scans the byte array for the FASTA header marker (`>`) and returns
212/// the byte positions where headers start. Filters to ensure headers
213/// are at the start of the file or after a newline.
214///
215/// # Arguments
216///
217/// * `data` - The raw file content as bytes
218///
219/// # Returns
220///
221/// * A `Vec<usize>` containing byte positions of all FASTA headers
222///
223/// # Example
224///
225/// ```rust, ignore
226/// let data = b">header1\nACGT\n>header2\nTGCA";
227/// let headers = find_fasta_headers(data);
228/// assert_eq!(headers, vec![0, 12]);
229/// ```
230fn find_fasta_headers(data: &[u8]) -> Vec<usize> {
231    memchr_iter(FA_NEEDLE, data)
232        .filter(|&i| i == 0 || data.get(i - 1) == Some(&b'\n'))
233        .collect()
234}
235
236/// Extracts the FASTA record ID from a header line.
237///
238/// Parses the header starting at the given position and extracts the
239/// first token (typically the sequence ID before any whitespace).
240///
241/// # Arguments
242///
243/// * `data` - The raw file content as bytes
244/// * `header_start` - The byte position where the header starts
245///
246/// # Returns
247///
248/// * The extracted ID as a `String`, or "record" if extraction fails
249///
250/// # Example
251///
252/// ```rust, ignore
253/// let data = b">seq1 description here\nACGT";
254/// let id = extract_fasta_id(data, 0);
255/// assert_eq!(id, "seq1");
256/// ```
257fn extract_fasta_id(data: &[u8], header_start: usize) -> String {
258    let start = header_start.saturating_add(1);
259    if start >= data.len() {
260        return "record".to_string();
261    }
262
263    let line_end = data[start..]
264        .iter()
265        .position(|&b| b == b'\n')
266        .map(|i| start + i)
267        .unwrap_or(data.len());
268
269    let mut line = &data[start..line_end];
270    if line.last() == Some(&b'\r') {
271        line = &line[..line.len().saturating_sub(1)];
272    }
273
274    let token_end = line
275        .iter()
276        .position(|b| b.is_ascii_whitespace())
277        .unwrap_or(line.len());
278
279    let token = &line[..token_end];
280    String::from_utf8_lossy(token).to_string()
281}
282
283/// Sanitizes a FASTA ID for use as a filename.
284///
285/// Replaces non-alphanumeric characters (except underscore, hyphen, dot)
286/// with underscores. Collapses multiple consecutive underscores into one
287/// and trims leading/trailing underscores.
288///
289/// # Arguments
290///
291/// * `id` - The raw FASTA ID to sanitize
292///
293/// # Returns
294///
295/// * A sanitized string safe for use as a filename
296///
297/// # Example
298///
299/// ```rust, ignore
300/// let sanitized = sanitize_fasta_id("seq_1 description");
301/// assert_eq!(sanitized, "seq_1_description");
302/// ```
303fn sanitize_fasta_id(id: &str) -> String {
304    let mut out = String::with_capacity(id.len());
305    let mut prev_is_underscore = false;
306
307    for ch in id.chars() {
308        let mapped = if ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-' | '.') {
309            ch
310        } else {
311            '_'
312        };
313
314        if mapped == '_' {
315            if prev_is_underscore {
316                continue;
317            }
318            prev_is_underscore = true;
319        } else {
320            prev_is_underscore = false;
321        }
322
323        out.push(mapped);
324    }
325
326    let trimmed = out.trim_matches('_');
327    if trimmed.is_empty() {
328        "record".to_string()
329    } else {
330        trimmed.to_string()
331    }
332}
333
334/// Generates unique filenames for each FASTA header in the input.
335///
336/// Creates filenames based on sanitized FASTA IDs, handling duplicates
337/// by appending numeric suffixes. Each filename includes the specified
338/// suffix and file extension.
339///
340/// # Arguments
341///
342/// * `data` - The raw file content as bytes
343/// * `headers` - Vector of byte positions for each header
344/// * `suffix` - Optional suffix to append to filenames
345/// * `extension` - File extension to use (e.g., "fa", "fasta.gz")
346///
347/// # Returns
348///
349/// * A `Vec<String>` of unique filenames, one per header
350///
351/// # Example
352///
353/// ```rust, ignore
354/// let data = b">seq1\nACGT\n>seq2\nTGCA";
355/// let headers = vec![0, 12];
356/// let names = make_unique_header_filenames(data, &headers, "part", "fa");
357/// assert_eq!(names, vec!["seq1_part.fa", "seq2_part.fa"]);
358/// ```
359fn make_unique_header_filenames(
360    data: &[u8],
361    headers: &[usize],
362    suffix: &str,
363    extension: &str,
364) -> Vec<String> {
365    let mut used = HashSet::with_capacity(headers.len());
366    let mut names = Vec::with_capacity(headers.len());
367
368    for &header_start in headers {
369        let raw_id = extract_fasta_id(data, header_start);
370        let sanitized = sanitize_fasta_id(&raw_id);
371
372        let base_name = if suffix.is_empty() {
373            sanitized
374        } else {
375            format!("{sanitized}_{suffix}")
376        };
377
378        let mut candidate = base_name.clone();
379        let mut duplicate_idx = 1usize;
380        while used.contains(&candidate) {
381            candidate = format!("{base_name}_{duplicate_idx}");
382            duplicate_idx += 1;
383        }
384
385        used.insert(candidate.clone());
386        names.push(format!("{candidate}.{extension}"));
387    }
388
389    names
390}
391
392/// Logs a warning if the number of output files would be excessive.
393///
394/// Warns the user when splitting by header would create more than
395/// 100,000 output files, as this may cause filesystem issues.
396///
397/// # Arguments
398///
399/// * `num_headers` - The number of headers (and thus output files)
400/// * `input` - The input file path for context in the warning
401///
402/// # Example
403///
404/// ```rust, ignore
405/// use std::path::Path;
406///
407/// warn_if_many_header_outputs(150000, &Path::new("input.fa"));
408/// // Logs warning if logging is enabled
409/// ```
410fn warn_if_many_header_outputs(num_headers: usize, input: &Path) {
411    if num_headers > HEADER_SPLIT_WARN_THRESHOLD {
412        log::warn!(
413            "WARNING: input {} contains {} FASTA headers; --headers will create {} output files",
414            input.display(),
415            num_headers,
416            num_headers
417        );
418    }
419}
420
421/// Calculates byte regions for each chunk based on the split mode.
422///
423/// # Arguments
424///
425/// * `headers` - Vector of byte positions for each FASTA header
426/// * `data_len` - Total length of the data in bytes
427/// * `mode` - The splitting mode (ChunkSize, NumFiles, or FileHeader)
428///
429/// # Returns
430///
431/// * A `Result<Vec<ChunkRegion>>` containing start/end byte positions
432///
433/// # Errors
434///
435/// Returns an error if chunk size or number of files is 0
436///
437/// # Example
438///
439/// ```rust, ignore
440/// use crate::{SplitMode, ChunkRegion};
441///
442/// let headers = vec![0, 50, 100, 150];
443/// let regions = chunk_regions_for_mode(&headers, 200, SplitMode::ChunkSize(2)).unwrap();
444/// assert_eq!(regions.len(), 2);
445/// ```
446fn chunk_regions_for_mode(
447    headers: &[usize],
448    data_len: usize,
449    mode: SplitMode,
450) -> Result<Vec<ChunkRegion>> {
451    match mode {
452        SplitMode::ChunkSize(records_per_file) => {
453            if records_per_file == 0 {
454                anyhow::bail!("ERROR: --chunks must be greater than 0");
455            }
456
457            let nchunks = headers.len().div_ceil(records_per_file);
458
459            Ok((0..nchunks)
460                .map(|i| {
461                    let start = *headers.get(i * records_per_file).unwrap_or(&data_len);
462                    let end = *headers.get((i + 1) * records_per_file).unwrap_or(&data_len);
463                    ChunkRegion { start, end }
464                })
465                .collect())
466        }
467        SplitMode::NumFiles(files) => {
468            if files == 0 {
469                anyhow::bail!("ERROR: --files must be greater than 0");
470            }
471
472            // let records_per_file = (headers.len() + files - 1) / files;
473            let records_per_file = headers.len().div_ceil(files);
474
475            Ok((0..files)
476                .map(|i| {
477                    let start_idx = i * records_per_file;
478                    let end_idx = ((i + 1) * records_per_file).min(headers.len());
479                    let start = *headers.get(start_idx).unwrap_or(&data_len);
480                    let end = *headers.get(end_idx).unwrap_or(&data_len);
481                    ChunkRegion { start, end }
482                })
483                .collect())
484        }
485        SplitMode::FileHeader => Ok((0..headers.len())
486            .map(|i| {
487                let start = headers[i];
488                let end = *headers.get(i + 1).unwrap_or(&data_len);
489                ChunkRegion { start, end }
490            })
491            .collect()),
492    }
493}
494
495/// Splits large sequencing files (FASTA, FASTQ, gzipped versions) into smaller chunks or files.
496///
497/// This is the main entry point for the `iso-split` functionality. It parses
498/// command-line arguments, determines the input file type based on its extension,
499/// and dispatches to the appropriate splitting function (`split_fa`, `split_fa_gz`, `split_fq`).
500///
501/// The splitting logic depends on the `SplitMode` specified in the `Args`
502/// (either by a fixed number of records per chunk/file or by a target number of output files).
503///
504/// # Arguments
505///
506/// * `args` - A `Vec<String>` representing the command-line arguments passed to the program.
507///
508/// # Returns
509///
510/// * `Result<()>` - An `Ok(())` on successful completion, or an `anyhow::Error` if
511///                 argument parsing fails, the file format is unrecognized, or
512///                 any splitting operation encounters an error.
513///
514/// # Errors
515///
516/// * Returns an error if argument parsing (`cli::Args::from`) fails.
517/// * Returns an error if the input file's suffix is not recognized by the `dispatch!` macro.
518/// * Returns any error propagated from the called splitting functions (`split_fa`, etc.).
519///
520/// # Example
521///
522/// ```rust, ignore
523/// fn main() -> Result<()> {
524///     lib_iso_split(vec!["--file".to_string(), "input.fasta.gz".to_string(), "--chunk-size".to_string(), "1000".to_string()])
525///     Ok(())
526/// }
527/// ```
528pub fn lib_iso_split(args: Vec<String>) -> Result<()> {
529    let args = cli::Args::from(args);
530
531    dispatch!(&args.file, {
532        "fa.gz" => split_fa_gz(&args)?,
533        "fasta.gz" =>  split_fa_gz(&args)?,
534        "fq.gz" =>  split_fq(&args)?,
535        "fastq.gz" =>  split_fq(&args)?,
536        "fq" =>  split_fq(&args)?,
537        "fastq" =>  split_fq(&args)?,
538        "fa" =>  split_fa(&args)?,
539        "fasta" =>  split_fa(&args)?,
540    });
541
542    Ok(())
543}
544
545/// Splits a non-gzipped FASTA file into multiple smaller FASTA files.
546///
547/// This function reads a FASTA file, identifies the start positions of all records
548/// using `memchr_iter` to find `FA_NEEDLE` (typically `>`). It then divides the
549/// file's content (memory-mapped for efficiency) into chunks based on the
550/// specified `SplitMode` (either `ChunkSize` or `NumFiles`). Each chunk is then
551/// written to a new output file within the designated output directory, utilizing
552/// a Rayon thread pool for parallel processing.
553///
554/// # Arguments
555///
556/// * `args` - A reference to an `Args` struct containing the input file path,
557///            output directory, number of threads, splitting mode, and an optional suffix.
558///
559/// # Returns
560///
561/// * `Result<()>` - An `Ok(())` on successful completion, or an `anyhow::Error` if
562///                 any operation (file opening, memory mapping, directory creation,
563///                 writing to files, or thread pool building) fails.
564///
565/// # Errors
566///
567/// * Returns an error if the input FASTA file cannot be opened or memory-mapped.
568/// * Returns an error if no FASTA records are found in the input file.
569/// * Returns an error if the output directory cannot be created.
570/// * Returns an error if `SplitMode::NumFiles` is 0.
571/// * Returns any `std::io::Error` during file writing.
572///
573/// # Parallelism
574///
575/// This function uses `rayon` for parallel processing of chunks, improving
576/// performance for large files. The number of threads is configured via `args.threads`.
577///
578/// # Example
579///
580/// ```rust, ignore
581/// use anyhow::Result;
582/// use std::path::PathBuf;
583/// // Assuming Args and SplitMode are defined as in lib_iso_split example
584///
585/// fn main() -> Result<()> {
586///     let args = cli::Args {
587///         file: PathBuf::from("input.fa"),
588///         outdir: PathBuf::from("fa_chunks"),
589///         threads: 4,
590///         suffix: Some("part".to_string()),
591///         mode_chunk_size: Some(100), // Split into chunks of 100 records
592///         mode_num_files: None,
593///     };
594///     // split_fa(&args)?;
595///     println!("Successfully split FASTA file.");
596///     Ok(())
597/// }
598/// ```
599pub fn split_fa(args: &Args) -> Result<()> {
600    log::info!("INFO: running in FASTA mode with args: {:?}", &args);
601
602    let pool = rayon::ThreadPoolBuilder::new()
603        .num_threads(args.threads)
604        .build()?;
605
606    let file = File::open(&args.file)?;
607    let mmap = unsafe { Mmap::map(&file)? };
608    let data = Arc::new(mmap);
609
610    let header = find_fasta_headers(data.as_ref());
611    if header.is_empty() {
612        anyhow::bail!("ERROR: No FASTA records found");
613    }
614
615    let mode = args.mode()?;
616    let suffix = args.suffix.clone().unwrap_or_default();
617    let extension = fasta_output_extension(&args.file);
618    create_dir_all(&args.outdir)?;
619
620    if matches!(mode, SplitMode::FileHeader) {
621        warn_if_many_header_outputs(header.len(), &args.file);
622        let filenames = make_unique_header_filenames(data.as_ref(), &header, &suffix, extension);
623
624        pool.install(|| {
625            (0..header.len())
626                .into_par_iter()
627                .try_for_each(|i| -> Result<()> {
628                    let start = header[i];
629                    let end = *header.get(i + 1).unwrap_or(&data.len());
630                    let output_file = PathBuf::from(&args.outdir).join(&filenames[i]);
631                    let mut writer = BufWriter::new(File::create(output_file)?);
632                    writer.write_all(&data[start..end])?;
633                    writer.flush()?;
634                    Ok(())
635                })
636        })?;
637
638        return Ok(());
639    }
640
641    let chunks = chunk_regions_for_mode(&header, data.len(), mode)?;
642    let prefix = match mode {
643        SplitMode::NumFiles(_) => "part",
644        _ => "chunk",
645    };
646
647    pool.install(|| {
648        chunks
649            .into_par_iter()
650            .enumerate()
651            .try_for_each(|(i, chunk)| -> Result<()> {
652                let output_file = PathBuf::from(&args.outdir)
653                    .join(format!("tmp_{prefix}_{:03}_{suffix}.{extension}", i));
654                let mut writer = BufWriter::new(File::create(output_file)?);
655                writer.write_all(&data[chunk.start..chunk.end])?;
656                writer.flush()?;
657                Ok(())
658            })
659    })?;
660
661    Ok(())
662}
663
664/// Splits a gzipped FASTA file (`.fa.gz` or `.fasta.gz`) into multiple smaller gzipped FASTA files.
665///
666/// This function first decompresses the entire gzipped input file into memory.
667/// It then identifies the start positions of all FASTA records using `memchr_iter`.
668/// The decompressed data is divided into chunks based on the `SplitMode` (either
669/// `ChunkSize` for records per file or `NumFiles` for a target number of files).
670/// Each chunk is then individually compressed and written to a new gzipped output file
671/// within the specified output directory, leveraging a global Rayon thread pool for parallel execution.
672///
673/// A special case is handled: if the total number of records is less than or equal to
674/// the `records_per_file` when in `ChunkSize` mode, it creates a symlink to the original file
675/// instead of splitting, to avoid unnecessary work.
676///
677/// # Arguments
678///
679/// * `args` - A reference to an `Args` struct containing the input file path,
680///            output directory, number of threads, splitting mode, and an optional suffix.
681///
682/// # Returns
683///
684/// * `Result<()>` - An `Ok(())` on successful completion, or an `anyhow::Error` if
685///                 any operation (file opening, decompression, directory creation,
686///                 writing to files, or thread pool building) fails.
687///
688/// # Errors
689///
690/// * Returns an error if the input gzipped FASTA file cannot be opened or decompressed.
691/// * Returns an error if no FASTA records are found in the decompressed data.
692/// * Returns an error if the output directory cannot be created.
693/// * Returns an error if `SplitMode::NumFiles` is 0.
694/// * Returns any `std::io::Error` during file writing or compression.
695///
696/// # Parallelism
697///
698/// This function uses `rayon` for parallel processing of chunks, improving
699/// performance for large files. It builds a global thread pool with `args.threads`.
700///
701/// # Example
702///
703/// ```rust, ignore
704/// use anyhow::Result;
705/// use std::path::PathBuf;
706/// // Assuming Args and SplitMode are defined as in lib_iso_split example
707///
708/// fn main() -> Result<()> {
709///     let args = cli::Args {
710///         file: PathBuf::from("input.fa.gz"),
711///         outdir: PathBuf::from("fa_gz_chunks"),
712///         threads: 4,
713///         suffix: Some("part".to_string()),
714///         mode_num_files: Some(10), // Split into 10 output files
715///         mode_chunk_size: None,
716///     };
717///     // split_fa_gz(&args)?;
718///     println!("Successfully split gzipped FASTA file.");
719///     Ok(())
720/// }
721/// ```
722pub fn split_fa_gz(args: &Args) -> Result<()> {
723    log::info!("INFO: running in FASTA.GZ mode with args: {:?}", &args);
724
725    let pool = rayon::ThreadPoolBuilder::new()
726        .num_threads(args.threads)
727        .build()?;
728
729    let mut gz = MultiGzDecoder::new(File::open(&args.file)?);
730    let mut decompressed = Vec::new();
731    gz.read_to_end(&mut decompressed)?;
732    let data = Arc::new(decompressed);
733
734    let header = find_fasta_headers(data.as_ref());
735    log::info!(
736        "INFO: Found {} records in {}",
737        header.len(),
738        &args.file.display()
739    );
740
741    if header.is_empty() {
742        log::error!("ERROR: No FASTA records found in decompressed data");
743        anyhow::bail!("ERROR: No FASTA records found in decompressed data");
744    }
745
746    let mode = args.mode()?;
747    let suffix = args.suffix.clone().unwrap_or_default();
748    let extension = fasta_gz_output_extension(&args.file);
749    create_dir_all(&args.outdir)?;
750
751    if let SplitMode::ChunkSize(records_per_file) = mode {
752        if records_per_file == 0 {
753            anyhow::bail!("ERROR: --chunks must be greater than 0");
754        }
755
756        if header.len() <= records_per_file {
757            let outpath =
758                PathBuf::from(&args.outdir).join(format!("tmp_chunk_000_{suffix}.{extension}"));
759            std::fs::create_dir_all(&args.outdir)?;
760            log::warn!(
761                "Only {} records found, <= --chunks {}, creating symlink to original file...",
762                header.len(),
763                records_per_file
764            );
765
766            if args.copy {
767                std::fs::copy(&args.file, &outpath)?;
768            } else {
769                symlink(&args.file, &outpath)?;
770            }
771
772            return Ok(());
773        }
774    }
775
776    if matches!(mode, SplitMode::FileHeader) {
777        warn_if_many_header_outputs(header.len(), &args.file);
778        let filenames = make_unique_header_filenames(data.as_ref(), &header, &suffix, extension);
779
780        pool.install(|| {
781            (0..header.len())
782                .into_par_iter()
783                .try_for_each(|i| -> Result<()> {
784                    let start = header[i];
785                    let end = *header.get(i + 1).unwrap_or(&data.len());
786                    let output_file = PathBuf::from(&args.outdir).join(&filenames[i]);
787                    let file = File::create(output_file)?;
788                    let writer = BufWriter::new(file);
789                    let mut encoder =
790                        flate2::write::GzEncoder::new(writer, flate2::Compression::fast());
791                    encoder.write_all(&data[start..end])?;
792                    encoder.finish()?;
793                    Ok(())
794                })
795        })?;
796
797        return Ok(());
798    }
799
800    let chunks = chunk_regions_for_mode(&header, data.len(), mode)?;
801    let prefix = match mode {
802        SplitMode::NumFiles(_) => "part",
803        _ => "chunk",
804    };
805
806    pool.install(|| {
807        chunks
808            .into_par_iter()
809            .enumerate()
810            .try_for_each(|(i, chunk)| -> Result<()> {
811                let output_file = PathBuf::from(&args.outdir)
812                    .join(format!("tmp_{prefix}_{:03}_{suffix}.{extension}", i));
813                let file = File::create(output_file)?;
814                let writer = BufWriter::new(file);
815                let mut encoder =
816                    flate2::write::GzEncoder::new(writer, flate2::Compression::fast());
817                encoder.write_all(&data[chunk.start..chunk.end])?;
818                encoder.finish()?;
819                Ok(())
820            })
821    })?;
822
823    Ok(())
824}
825
826/// Split mode
827///
828/// Specifies how to split the input file:
829/// - `ChunkSize(n)`: Split into files with at most n records each
830/// - `NumFiles(n)`: Split into exactly n output files
831/// - `FileHeader`: Create one output file per FASTA record
832///
833/// # Arguments
834///
835/// * `ChunkSize(usize)` - Number of records per output file
836/// * `NumFiles(usize)` - Number of output files to create
837/// * `FileHeader` - Split by individual FASTA headers
838///
839/// # Example
840///
841/// ```rust, ignore
842/// use fxsplit::SplitMode;
843///
844/// let mode = SplitMode::ChunkSize(1000);
845/// let mode = SplitMode::NumFiles(10);
846/// let mode = SplitMode::FileHeader;
847/// ```
848#[derive(Debug, Clone, Copy)]
849pub enum SplitMode {
850    ChunkSize(usize), // N records per output file
851    NumFiles(usize),  // K output files
852    FileHeader,       // split by file header
853}
854
855// public structs
856/// Region [iso-split]
857///
858/// This enum is used to store region boundaries
859/// of a fq/fa file
860///
861/// # Example
862///
863/// ```rust, ignore
864/// use iso::Region;
865///
866/// let region = Region{start: 1, end: 43};
867/// assert_eq!(43, region.end);
868/// ```
869#[derive(Debug, Clone)]
870pub struct ChunkRegion {
871    pub start: usize,
872    pub end: usize,
873}
874
875/// Splits a gzipped FASTQ file (`.fq.gz` or `.fastq.gz`) into multiple smaller gzipped FASTQ files.
876///
877/// This function acts as a dispatcher for FASTQ splitting, determining the
878/// records-per-file based on the `SplitMode` (`ChunkSize` or `NumFiles`).
879/// If splitting by `NumFiles`, it first counts all records in the input file
880/// to calculate the appropriate `records_per_file` for even distribution.
881/// It then delegates the actual splitting and writing to `split_by_chunk_size`.
882///
883/// # Arguments
884///
885/// * `args` - A reference to an `Args` struct containing the input file path,
886///            output directory, number of threads (though not directly used here,
887///            passed to underlying functions), splitting mode, and an optional suffix.
888///
889/// # Returns
890///
891/// * `anyhow::Result<()>` - An `Ok(())` on successful completion, or an `anyhow::Error` if
892///                         the splitting mode is invalid, record counting fails, or
893///                         `split_by_chunk_size` encounters an error.
894///
895/// # Errors
896///
897/// * Returns an error if `args.mode()` returns an error (e.g., no split mode specified).
898/// * Returns an error if `count_fastq_records` fails.
899/// * Propagates any error from `split_by_chunk_size`.
900///
901/// # Example
902///
903/// ```rust, ignore
904/// use anyhow::Result;
905/// use std::path::PathBuf;
906/// // Assuming Args and SplitMode are defined as in lib_iso_split example
907///
908/// fn main() -> Result<()> {
909///     let args = cli::Args {
910///         file: PathBuf::from("input.fastq.gz"),
911///         outdir: PathBuf::from("fq_chunks"),
912///         threads: 4, // Not directly used in split_fq, but passed to helpers
913///         suffix: Some("batch".to_string()),
914///         mode_chunk_size: None,
915///         mode_num_files: Some(5), // Split into 5 output files
916///     };
917///     // split_fq(&args)?;
918///     println!("Successfully split FASTQ file.");
919///     Ok(())
920/// }
921/// ```
922pub fn split_fq(args: &Args) -> anyhow::Result<()> {
923    log::info!("INFO: running in FASTQ mode with args: {:?}", &args);
924
925    let mode = args.mode()?;
926    let suffix = args.suffix.clone().unwrap_or_default();
927
928    match mode {
929        SplitMode::ChunkSize(records_per_file) => {
930            if records_per_file == 0 {
931                anyhow::bail!("ERROR: --chunks must be greater than 0");
932            }
933            log::info!("INFO: splitting by chunk size!");
934            split_fastq_by_chunk_size(&args.file, &args.outdir, records_per_file, &suffix)
935        }
936        SplitMode::NumFiles(num_files) => {
937            if num_files == 0 {
938                anyhow::bail!("ERROR: --files must be greater than 0");
939            }
940            log::info!("INFO: splitting by number of files!");
941            let total_records = count_fastq_records(&args.file)?;
942            split_fastq_by_num_files(&args.file, &args.outdir, num_files, total_records, &suffix)
943        }
944        SplitMode::FileHeader => anyhow::bail!(
945            "ERROR: --headers is only supported for FASTA/FASTA.GZ files, not FASTQ/FASTQ.GZ"
946        ),
947    }
948}
949
950/// Represents a single FASTQ record.
951///
952/// # Fields
953///
954/// * `header` - The FASTQ header line (starts with @)
955/// * `seq` - The nucleotide sequence
956/// * `plus` - The plus line (usually just "+")
957/// * `qual` - Quality scores string
958///
959/// # Example
960///
961/// ```rust, ignore
962/// let record = FastqRecord {
963///     header: "@seq1".to_string(),
964///     seq: "ACGT".to_string(),
965///     plus: "+".to_string(),
966///     qual: "IIII".to_string(),
967/// };
968/// ```
969struct FastqRecord {
970    header: String,
971    seq: String,
972    plus: String,
973    qual: String,
974}
975
976/// Writer type for FASTQ output files.
977///
978/// Handles both plain and gzipped FASTQ file output.
979///
980/// # Variants
981///
982/// * `Plain` - Plain text FASTQ writer
983/// * `Gzip` - Gzipped FASTQ writer
984enum FastqWriter {
985    Plain(BufWriter<File>),
986    Gzip(BufWriter<GzEncoder<File>>),
987}
988
989impl FastqWriter {
990    /// Creates a new FASTQ writer for the specified file index.
991    ///
992    /// # Arguments
993    ///
994    /// * `out_dir` - Output directory path
995    /// * `index` - File index number (used in filename)
996    /// * `suffix` - Optional suffix for the filename
997    /// * `gzip_output` - Whether to use gzip compression
998    ///
999    /// # Returns
1000    ///
1001    /// * `anyhow::Result<Self>` - The created writer
1002    ///
1003    /// # Example
1004    ///
1005    /// ```rust, ignore
1006    /// use std::path::Path;
1007    ///
1008    /// let writer = FastqWriter::create(Path::new("output"), 0, "part", true).unwrap();
1009    /// ```
1010    fn create(
1011        out_dir: &Path,
1012        index: usize,
1013        suffix: &str,
1014        gzip_output: bool,
1015    ) -> anyhow::Result<Self> {
1016        let extension = if gzip_output { "fastq.gz" } else { "fastq" };
1017        let path = out_dir.join(format!("tmp_chunk_{index:03}_{suffix}.{extension}"));
1018        let file = File::create(path)?;
1019
1020        if gzip_output {
1021            let encoder = GzEncoder::new(file, Compression::fast());
1022            Ok(FastqWriter::Gzip(BufWriter::new(encoder)))
1023        } else {
1024            Ok(FastqWriter::Plain(BufWriter::new(file)))
1025        }
1026    }
1027
1028    /// Writes a single FASTQ record to the output.
1029    ///
1030    /// # Arguments
1031    ///
1032    /// * `record` - The FastqRecord to write
1033    ///
1034    /// # Returns
1035    ///
1036    /// * `anyhow::Result<()>` - Success or error
1037    ///
1038    /// # Example
1039    ///
1040    /// ```rust, ignore
1041    /// use crate::FastqRecord;
1042    ///
1043    /// let record = FastqRecord {
1044    ///     header: "@seq1".to_string(),
1045    ///     seq: "ACGT".to_string(),
1046    ///     plus: "+".to_string(),
1047    ///     qual: "IIII".to_string(),
1048    /// };
1049    /// // writer.write_record(&record).unwrap();
1050    /// ```
1051    fn write_record(&mut self, record: &FastqRecord) -> anyhow::Result<()> {
1052        match self {
1053            FastqWriter::Plain(writer) => {
1054                writer.write_all(record.header.as_bytes())?;
1055                writer.write_all(b"\n")?;
1056                writer.write_all(record.seq.as_bytes())?;
1057                writer.write_all(b"\n")?;
1058                writer.write_all(record.plus.as_bytes())?;
1059                writer.write_all(b"\n")?;
1060                writer.write_all(record.qual.as_bytes())?;
1061                writer.write_all(b"\n")?;
1062                Ok(())
1063            }
1064            FastqWriter::Gzip(writer) => {
1065                writer.write_all(record.header.as_bytes())?;
1066                writer.write_all(b"\n")?;
1067                writer.write_all(record.seq.as_bytes())?;
1068                writer.write_all(b"\n")?;
1069                writer.write_all(record.plus.as_bytes())?;
1070                writer.write_all(b"\n")?;
1071                writer.write_all(record.qual.as_bytes())?;
1072                writer.write_all(b"\n")?;
1073                Ok(())
1074            }
1075        }
1076    }
1077
1078    /// Finalizes and closes the writer, flushing any buffers.
1079    ///
1080    /// For gzip writers, also finishes the gzip stream.
1081    ///
1082    /// # Returns
1083    ///
1084    /// * `anyhow::Result<()>` - Success or error
1085    ///
1086    /// # Example
1087    ///
1088    /// ```rust, ignore
1089    /// // writer.finalize().unwrap();
1090    /// ```
1091    fn finalize(self) -> anyhow::Result<()> {
1092        match self {
1093            FastqWriter::Plain(mut writer) => {
1094                writer.flush()?;
1095                Ok(())
1096            }
1097            FastqWriter::Gzip(mut writer) => {
1098                writer.flush()?;
1099                let encoder = writer
1100                    .into_inner()
1101                    .map_err(|e| anyhow::anyhow!("ERROR: failed to flush gzip writer: {}", e))?;
1102                let _ = encoder.finish()?;
1103                Ok(())
1104            }
1105        }
1106    }
1107}
1108
1109/// Opens a FASTQ file for reading, handling gzip decompression.
1110///
1111/// # Arguments
1112///
1113/// * `input` - Path to the input FASTQ file
1114///
1115/// # Returns
1116///
1117/// * `anyhow::Result<Box<dyn BufRead>>` - A buffered reader
1118///
1119/// # Example
1120///
1121/// ```rust, ignore
1122/// use std::path::Path;
1123///
1124/// let reader = open_fastq_reader(Path::new("input.fq.gz")).unwrap();
1125/// ```
1126fn open_fastq_reader(input: &Path) -> anyhow::Result<Box<dyn BufRead>> {
1127    let file = File::open(input)?;
1128    if fastq_is_gz(input) {
1129        Ok(Box::new(BufReader::new(MultiGzDecoder::new(file))))
1130    } else {
1131        Ok(Box::new(BufReader::new(file)))
1132    }
1133}
1134
1135/// Reads the next FASTQ record from an iterator of lines.
1136///
1137/// # Arguments
1138///
1139/// * `lines` - Iterator of lines from the FASTQ file
1140///
1141/// # Returns
1142///
1143/// * `anyhow::Result<Option<FastqRecord>>` - The record or None at EOF
1144///
1145/// # Errors
1146///
1147/// Returns an error if the FASTQ format is malformed
1148///
1149/// # Example
1150///
1151/// ```rust, ignore
1152/// use std::io::Cursor;
1153///
1154/// let data = "@seq1\nACGT\n+\nIIII\n@seq2\nTGCA\n+\nJJJJ";
1155/// let mut lines = Cursor::new(data).lines();
1156/// let record = next_fastq_record(&mut lines).unwrap();
1157/// ```
1158fn next_fastq_record<I>(lines: &mut I) -> anyhow::Result<Option<FastqRecord>>
1159where
1160    I: Iterator<Item = std::io::Result<String>>,
1161{
1162    let header = match lines.next() {
1163        Some(h) => h?,
1164        None => return Ok(None),
1165    };
1166
1167    let seq = lines
1168        .next()
1169        .ok_or_else(|| anyhow::anyhow!("ERROR: unexpected EOF in FASTQ record"))??;
1170    let plus = lines
1171        .next()
1172        .ok_or_else(|| anyhow::anyhow!("ERROR: unexpected EOF in FASTQ record"))??;
1173    let qual = lines
1174        .next()
1175        .ok_or_else(|| anyhow::anyhow!("ERROR: unexpected EOF in FASTQ record"))??;
1176
1177    Ok(Some(FastqRecord {
1178        header,
1179        seq,
1180        plus,
1181        qual,
1182    }))
1183}
1184
1185/// Splits a FASTQ file into chunks of a fixed number of records.
1186///
1187/// # Arguments
1188///
1189/// * `input` - Path to the input FASTQ file
1190/// * `out_dir` - Output directory path
1191/// * `records_per_file` - Maximum records per output file
1192/// * `suffix` - Optional suffix for output filenames
1193///
1194/// # Returns
1195///
1196/// * `anyhow::Result<()>` - Success or error
1197///
1198/// # Example
1199///
1200/// ```rust, ignore
1201/// use std::path::Path;
1202///
1203/// split_fastq_by_chunk_size(
1204///     Path::new("input.fq.gz"),
1205///     Path::new("output"),
1206///     1000,
1207///     "part"
1208/// ).unwrap();
1209/// ```
1210fn split_fastq_by_chunk_size(
1211    input: &Path,
1212    out_dir: &Path,
1213    records_per_file: usize,
1214    suffix: &str,
1215) -> anyhow::Result<()> {
1216    create_dir_all(out_dir)?;
1217
1218    let gzip_output = fastq_is_gz(input);
1219    let mut lines = open_fastq_reader(input)?.lines();
1220    let mut writer: Option<FastqWriter> = None;
1221    let mut file_index = 0usize;
1222    let mut record_index = 0usize;
1223
1224    while let Some(record) = next_fastq_record(&mut lines)? {
1225        if writer.is_none() {
1226            writer = Some(FastqWriter::create(
1227                out_dir,
1228                file_index,
1229                suffix,
1230                gzip_output,
1231            )?);
1232        } else if record_index == records_per_file {
1233            if let Some(current_writer) = writer.take() {
1234                current_writer.finalize()?;
1235            }
1236            file_index += 1;
1237            record_index = 0;
1238            writer = Some(FastqWriter::create(
1239                out_dir,
1240                file_index,
1241                suffix,
1242                gzip_output,
1243            )?);
1244        }
1245
1246        if let Some(current_writer) = writer.as_mut() {
1247            current_writer.write_record(&record)?;
1248        }
1249        record_index += 1;
1250    }
1251
1252    if let Some(current_writer) = writer {
1253        current_writer.finalize()?;
1254    }
1255
1256    Ok(())
1257}
1258
1259/// Splits a FASTQ file into a specified number of output files.
1260///
1261/// Distributes records as evenly as possible across the output files.
1262///
1263/// # Arguments
1264///
1265/// * `input` - Path to the input FASTQ file
1266/// * `out_dir` - Output directory path
1267/// * `num_files` - Number of output files to create
1268/// * `total_records` - Total number of records in the input
1269/// * `suffix` - Optional suffix for output filenames
1270///
1271/// # Returns
1272///
1273/// * `anyhow::Result<()>` - Success or error
1274///
1275/// # Example
1276///
1277/// ```rust, ignore
1278/// use std::path::Path;
1279///
1280/// split_fastq_by_num_files(
1281///     Path::new("input.fq.gz"),
1282///     Path::new("output"),
1283///     10,
1284///     5000,
1285///     "part"
1286/// ).unwrap();
1287/// ```
1288fn split_fastq_by_num_files(
1289    input: &Path,
1290    out_dir: &Path,
1291    num_files: usize,
1292    total_records: usize,
1293    suffix: &str,
1294) -> anyhow::Result<()> {
1295    create_dir_all(out_dir)?;
1296
1297    let gzip_output = fastq_is_gz(input);
1298    let mut lines = open_fastq_reader(input)?.lines();
1299
1300    let base_records = total_records / num_files;
1301    let remainder = total_records % num_files;
1302
1303    for file_idx in 0..num_files {
1304        let records_for_file = if file_idx < remainder {
1305            base_records + 1
1306        } else {
1307            base_records
1308        };
1309
1310        let mut writer = FastqWriter::create(out_dir, file_idx, suffix, gzip_output)?;
1311        for _ in 0..records_for_file {
1312            let record = next_fastq_record(&mut lines)?
1313                .ok_or_else(|| anyhow::anyhow!("ERROR: unexpected EOF while splitting FASTQ"))?;
1314            writer.write_record(&record)?;
1315        }
1316        writer.finalize()?;
1317    }
1318
1319    if next_fastq_record(&mut lines)?.is_some() {
1320        anyhow::bail!("ERROR: FASTQ reader still has remaining records after splitting");
1321    }
1322
1323    Ok(())
1324}
1325
1326/// Counts the total number of records in a FASTQ file.
1327///
1328/// FASTQ files have 4 lines per record, so this counts lines and divides by 4.
1329///
1330/// # Arguments
1331///
1332/// * `input` - Path to the input FASTQ file
1333///
1334/// # Returns
1335///
1336/// * `anyhow::Result<usize>` - The total number of records
1337///
1338/// # Errors
1339///
1340/// Returns an error if the file is malformed (line count not divisible by 4)
1341///
1342/// # Example
1343///
1344/// ```rust, ignore
1345/// use std::path::Path;
1346///
1347/// let count = count_fastq_records(Path::new("input.fq.gz")).unwrap();
1348/// ```
1349fn count_fastq_records(input: &Path) -> anyhow::Result<usize> {
1350    let mut lines = open_fastq_reader(input)?.lines();
1351    let mut line_count = 0usize;
1352
1353    while let Some(line) = lines.next() {
1354        line?;
1355        line_count += 1;
1356    }
1357
1358    if !line_count.is_multiple_of(4) {
1359        anyhow::bail!(
1360            "ERROR: malformed FASTQ input: line count {} is not divisible by 4",
1361            line_count
1362        );
1363    }
1364
1365    Ok(line_count / 4)
1366}