flow_fcs/
write.rs

1//! FCS file writing utilities
2//!
3//! This module provides functionality to write FCS files to disk, including:
4//! - Duplicating existing files
5//! - Editing metadata and persisting changes
6//! - Creating new FCS files with data modifications (filtering, concatenation, column addition)
7//!
8//! ## Memory-Mapping Implications
9//!
10//! **Important**: When writing FCS files, the original memory-mapped file is not modified.
11//! All write operations create new files. The original `Fcs` struct remains valid and
12//! can continue to access the original file via memory-mapping until it's dropped.
13//!
14//! When you call `write_fcs_file()` or any of the modification functions:
15//! 1. The data is read from the DataFrame (which is already in memory)
16//! 2. A new file is created on disk
17//! 3. The original memory-mapped file remains unchanged
18//!
19//! This means:
20//! - You can safely write modified versions without affecting the original
21//! - The original `Fcs` struct can still be used after writing
22//! - No special handling is needed to "close" or "unmap" before writing
23//! - Multiple writes can happen concurrently from the same source file
24
25use crate::{
26    Fcs,
27    byteorder::ByteOrder,
28    datatype::FcsDataType,
29    header::Header,
30    keyword::{IntegerKeyword, Keyword, StringKeyword},
31    metadata::Metadata,
32    parameter::ParameterMap,
33    version::Version,
34};
35use anyhow::{Result, anyhow};
36use byteorder::{LittleEndian, WriteBytesExt};
37use polars::prelude::*;
38use std::fs::File;
39use std::io::Write;
40use std::path::Path;
41use std::sync::Arc;
42
43/// Write an FCS file to disk
44///
45/// **Important**: This function closes the memory-mapped file before writing.
46/// The original Fcs struct will no longer be able to access the original file
47/// after this operation, but the data is preserved in the DataFrame.
48///
49/// # Arguments
50/// * `fcs` - The FCS struct to write (will consume the struct)
51/// * `path` - Output file path
52///
53/// # Errors
54/// Returns an error if:
55/// - The path is invalid
56/// - The file cannot be written
57/// - Metadata cannot be serialized
58pub fn write_fcs_file(fcs: Fcs, path: impl AsRef<Path>) -> Result<()> {
59    let path = path.as_ref();
60
61    // Validate file extension
62    if path.extension().and_then(|s| s.to_str()) != Some("fcs") {
63        return Err(anyhow!("Output file must have .fcs extension"));
64    }
65
66    // Get data from DataFrame
67    let df = &*fcs.data_frame;
68    let n_events = df.height();
69    let n_params = df.width();
70
71    if n_events == 0 {
72        return Err(anyhow!("Cannot write FCS file with 0 events"));
73    }
74    if n_params == 0 {
75        return Err(anyhow!("Cannot write FCS file with 0 parameters"));
76    }
77
78    // Serialize data segment first (we need its size for metadata)
79    let data_segment = serialize_data(df, &fcs.metadata)?;
80
81    // Calculate offsets
82    let header_size = 58;
83    let text_start = header_size;
84    // Estimate text segment size (will recalculate after)
85    let estimated_text_size = estimate_text_segment_size(&fcs.metadata, n_events, n_params);
86    let estimated_text_end = text_start + estimated_text_size - 1;
87    let data_start = estimated_text_end + 1;
88    let data_end = data_start + data_segment.len() - 1;
89
90    // Serialize metadata to text segment (now we know data offsets)
91    let text_segment = serialize_metadata(&fcs.metadata, n_events, n_params, data_start, data_end)?;
92
93    // Recalculate offsets with actual text segment size
94    let text_end = text_start + text_segment.len() - 1;
95    let data_start = text_end + 1;
96    let data_end = data_start + data_segment.len() - 1;
97
98    // Build header
99    let header = build_header(
100        &fcs.header.version,
101        text_start,
102        text_end,
103        data_start,
104        data_end,
105    )?;
106
107    // Write file
108    let mut file = File::create(path)?;
109    file.write_all(&header)?;
110    file.write_all(&text_segment)?;
111    file.write_all(&data_segment)?;
112    file.sync_all()?;
113
114    Ok(())
115}
116
117/// Duplicate an existing FCS file to a new path
118///
119/// This creates an exact copy of the file on disk. The original Fcs struct
120/// remains valid and can continue to be used.
121///
122/// # Arguments
123/// * `fcs` - Reference to the FCS struct to duplicate
124/// * `path` - Output file path
125///
126/// # Errors
127/// Returns an error if the file cannot be written
128pub fn duplicate_fcs_file(fcs: &Fcs, path: impl AsRef<Path>) -> Result<()> {
129    use std::fs;
130
131    let path = path.as_ref();
132
133    // Simply copy the file on disk
134    fs::copy(&fcs.file_access.path, path)?;
135
136    Ok(())
137}
138
139/// Edit metadata and persist changes to disk
140///
141/// This function:
142/// 1. Updates the metadata in the Fcs struct
143/// 2. Writes the modified file to disk
144/// 3. Returns a new Fcs struct pointing to the new file
145///
146/// **Note**: The original file is not modified. A new file is created.
147///
148/// # Arguments
149/// * `fcs` - The FCS struct to modify
150/// * `path` - Output file path for the modified file
151/// * `updates` - Function that modifies the metadata
152///
153/// # Errors
154/// Returns an error if the file cannot be written
155pub fn edit_metadata_and_save<F>(mut fcs: Fcs, path: impl AsRef<Path>, updates: F) -> Result<Fcs>
156where
157    F: FnOnce(&mut Metadata),
158{
159    // Apply updates to metadata
160    updates(&mut fcs.metadata);
161
162    // Update $TOT if event count changed
163    let n_events = fcs.get_event_count_from_dataframe();
164    use crate::keyword::match_and_parse_keyword;
165    let tot_keyword = match_and_parse_keyword("$TOT", &n_events.to_string());
166    if let crate::keyword::KeywordCreationResult::Int(int_kw) = tot_keyword {
167        fcs.metadata
168            .keywords
169            .insert("$TOT".to_string(), Keyword::Int(int_kw));
170    }
171
172    // Write to new file
173    write_fcs_file(fcs.clone(), &path)?;
174
175    // Open the new file
176    Fcs::open(
177        path.as_ref()
178            .to_str()
179            .ok_or_else(|| anyhow!("Invalid path"))?,
180    )
181}
182
183/// Create a new FCS file by filtering events
184///
185/// Removes events where `mask[i] == false`. The mask must have the same length
186/// as the number of events in the original file.
187///
188/// # Arguments
189/// * `fcs` - The FCS struct to filter
190/// * `path` - Output file path
191/// * `mask` - Boolean mask (true = keep, false = remove)
192///
193/// # Errors
194/// Returns an error if:
195/// - The mask length doesn't match the number of events
196/// - The file cannot be written
197pub fn filter_events(fcs: Fcs, path: impl AsRef<Path>, mask: &[bool]) -> Result<Fcs> {
198    let df = &*fcs.data_frame;
199    let n_events = df.height();
200
201    if mask.len() != n_events {
202        return Err(anyhow!(
203            "Mask length {} doesn't match number of events {}",
204            mask.len(),
205            n_events
206        ));
207    }
208
209    // Filter DataFrame using Polars
210    let mask_vec: Vec<bool> = mask.to_vec();
211    let mask_series = Series::new("mask".into(), mask_vec);
212    let mask_ca = mask_series.bool()?;
213    let filtered_df = df.filter(&mask_ca)?;
214
215    // Create new Fcs with filtered data
216    let mut new_fcs = fcs.clone();
217    new_fcs.data_frame = Arc::new(filtered_df);
218
219    // Update metadata
220    let n_events_after = new_fcs.get_event_count_from_dataframe();
221    use crate::keyword::match_and_parse_keyword;
222    let tot_keyword = match_and_parse_keyword("$TOT", &n_events_after.to_string());
223    if let crate::keyword::KeywordCreationResult::Int(int_kw) = tot_keyword {
224        new_fcs
225            .metadata
226            .keywords
227            .insert("$TOT".to_string(), Keyword::Int(int_kw));
228    }
229
230    // Write to file
231    write_fcs_file(new_fcs.clone(), &path)?;
232
233    // Open the new file
234    Fcs::open(
235        path.as_ref()
236            .to_str()
237            .ok_or_else(|| anyhow!("Invalid path"))?,
238    )
239}
240
241/// Create a new FCS file by concatenating events from multiple files
242///
243/// All files must have the same parameters (same names and order).
244///
245/// # Arguments
246/// * `files` - Vector of FCS structs to concatenate
247/// * `path` - Output file path
248///
249/// # Errors
250/// Returns an error if:
251/// - Files have different parameters
252/// - The file cannot be written
253pub fn concatenate_events(files: Vec<Fcs>, path: impl AsRef<Path>) -> Result<Fcs> {
254    if files.is_empty() {
255        return Err(anyhow!("Cannot concatenate empty list of files"));
256    }
257
258    if files.len() == 1 {
259        // Just duplicate the single file
260        return duplicate_fcs_file(&files[0], &path).and_then(|_| {
261            Fcs::open(
262                path.as_ref()
263                    .to_str()
264                    .ok_or_else(|| anyhow!("Invalid path"))?,
265            )
266        });
267    }
268
269    // Verify all files have the same parameters
270    let first_params: Vec<String> = files[0].get_parameter_names_from_dataframe();
271
272    for (idx, fcs) in files.iter().enumerate().skip(1) {
273        let params: Vec<String> = fcs.get_parameter_names_from_dataframe();
274        if params != first_params {
275            return Err(anyhow!("File {} has different parameters than file 0", idx));
276        }
277    }
278
279    // Concatenate DataFrames using vstack
280    let dfs: Vec<DataFrame> = files.iter().map(|f| (*f.data_frame).clone()).collect();
281    let concatenated_df = dfs
282        .into_iter()
283        .reduce(|acc, df| acc.vstack(&df).unwrap_or(acc))
284        .unwrap();
285
286    // Create new Fcs using first file as template
287    let mut new_fcs = files[0].clone();
288    new_fcs.data_frame = Arc::new(concatenated_df);
289
290    // Update metadata
291    let n_events_after = new_fcs.get_event_count_from_dataframe();
292    use crate::keyword::match_and_parse_keyword;
293    let tot_keyword = match_and_parse_keyword("$TOT", &n_events_after.to_string());
294    if let crate::keyword::KeywordCreationResult::Int(int_kw) = tot_keyword {
295        new_fcs
296            .metadata
297            .keywords
298            .insert("$TOT".to_string(), Keyword::Int(int_kw));
299    }
300
301    // Generate new GUID
302    new_fcs.metadata.validate_guid();
303
304    // Write to file
305    write_fcs_file(new_fcs.clone(), &path)?;
306
307    // Open the new file
308    Fcs::open(
309        path.as_ref()
310            .to_str()
311            .ok_or_else(|| anyhow!("Invalid path"))?,
312    )
313}
314
315/// Create a new FCS file by adding a column (parameter) to existing data
316///
317/// This is useful for adding QC results (e.g., a boolean column indicating
318/// good/bad events) or other event-level annotations.
319///
320/// # Arguments
321/// * `fcs` - The FCS struct to modify
322/// * `path` - Output file path
323/// * `column_name` - Name of the new parameter
324/// * `values` - Values for the new parameter (must match number of events)
325///
326/// # Errors
327/// Returns an error if:
328/// - The values length doesn't match the number of events
329/// - The column name already exists
330/// - The file cannot be written
331pub fn add_column(
332    mut fcs: Fcs,
333    path: impl AsRef<Path>,
334    column_name: &str,
335    values: Vec<f32>,
336) -> Result<Fcs> {
337    let df = &*fcs.data_frame;
338    let n_events = df.height();
339
340    if values.len() != n_events {
341        return Err(anyhow!(
342            "Values length {} doesn't match number of events {}",
343            values.len(),
344            n_events
345        ));
346    }
347
348    // Check if column already exists
349    if df
350        .get_column_names()
351        .iter()
352        .any(|&name| name == column_name)
353    {
354        return Err(anyhow!("Column {} already exists", column_name));
355    }
356
357    // Add column to DataFrame
358    let mut new_df = df.clone();
359    let new_series = Series::new(column_name.into(), values);
360    new_df
361        .with_column(new_series)
362        .map_err(|e| anyhow!("Failed to add column: {}", e))?;
363
364    // Update Fcs struct
365    fcs.data_frame = Arc::new(new_df);
366
367    // Add parameter metadata
368    let n_params = fcs.get_parameter_count_from_dataframe();
369    let param_num = n_params; // 1-based indexing in FCS
370
371    // Update $PAR keyword
372    use crate::keyword::match_and_parse_keyword;
373    let par_keyword = match_and_parse_keyword("$PAR", &n_params.to_string());
374    if let crate::keyword::KeywordCreationResult::Int(int_kw) = par_keyword {
375        fcs.metadata
376            .keywords
377            .insert("$PAR".to_string(), Keyword::Int(int_kw));
378    }
379
380    // Add parameter keywords ($PnN, $PnB, etc.)
381    fcs.metadata
382        .insert_string_keyword(format!("$P{}N", param_num), column_name.to_string());
383
384    // Default: 32 bits (4 bytes) for float32
385    let pnb_keyword = match_and_parse_keyword(&format!("$P{}B", param_num), "32");
386    if let crate::keyword::KeywordCreationResult::Int(int_kw) = pnb_keyword {
387        fcs.metadata
388            .keywords
389            .insert(format!("$P{}B", param_num), Keyword::Int(int_kw));
390    }
391
392    // Default range
393    let pnr_keyword = match_and_parse_keyword(&format!("$P{}R", param_num), "262144");
394    if let crate::keyword::KeywordCreationResult::Int(int_kw) = pnr_keyword {
395        fcs.metadata
396            .keywords
397            .insert(format!("$P{}R", param_num), Keyword::Int(int_kw));
398    }
399
400    // Default amplification
401    fcs.metadata
402        .insert_string_keyword(format!("$P{}E", param_num), "0,0".to_string());
403
404    // Add to parameter map
405    use crate::TransformType;
406    use crate::parameter::Parameter;
407    fcs.parameters.insert(
408        column_name.to_string().into(),
409        Parameter::new(&param_num, column_name, column_name, &TransformType::Linear),
410    );
411
412    // Write to file
413    write_fcs_file(fcs.clone(), &path)?;
414
415    // Open the new file
416    Fcs::open(
417        path.as_ref()
418            .to_str()
419            .ok_or_else(|| anyhow!("Invalid path"))?,
420    )
421}
422
423// ==================== Internal Helper Functions ====================
424
425fn estimate_text_segment_size(metadata: &Metadata, n_events: usize, n_params: usize) -> usize {
426    // Rough estimate: base size + keywords
427    let base_size = 200; // Base keywords
428    let keyword_size = metadata.keywords.len() * 50; // Average keyword size
429    let param_keywords = n_params * 100; // Parameter keywords
430    base_size + keyword_size + param_keywords
431}
432
433fn serialize_metadata(
434    metadata: &Metadata,
435    n_events: usize,
436    n_params: usize,
437    data_start: usize,
438    data_end: usize,
439) -> Result<Vec<u8>> {
440    let delimiter = metadata.delimiter as u8;
441    let mut text_segment = Vec::new();
442
443    // Helper to add keyword-value pair
444    let mut add_keyword = |key: &str, value: &str| {
445        text_segment.push(delimiter);
446        text_segment.extend_from_slice(format!("${}", key).as_bytes());
447        text_segment.push(delimiter);
448        text_segment.extend_from_slice(value.as_bytes());
449    };
450
451    // Required keywords (order matters for FCS compatibility)
452    add_keyword("BEGINANALYSIS", "0");
453    add_keyword("ENDANALYSIS", "0");
454    add_keyword("BEGINSTEXT", "0");
455    add_keyword("ENDSTEXT", "0");
456    add_keyword("BEGINDATA", &data_start.to_string());
457    add_keyword("ENDDATA", &data_end.to_string());
458
459    // Serialize all keywords from metadata
460    let mut sorted_keys: Vec<_> = metadata.keywords.keys().collect();
461    sorted_keys.sort();
462
463    for key in sorted_keys {
464        // Skip keywords we've already added
465        if matches!(
466            key.as_str(),
467            "$BEGINANALYSIS"
468                | "$ENDANALYSIS"
469                | "$BEGINSTEXT"
470                | "$ENDSTEXT"
471                | "$BEGINDATA"
472                | "$ENDDATA"
473        ) {
474            continue;
475        }
476
477        let keyword = metadata.keywords.get(key).unwrap();
478        let value_str = match keyword {
479            Keyword::Int(int_kw) => match int_kw {
480                IntegerKeyword::TOT(_) => {
481                    // Use actual event count
482                    n_events.to_string()
483                }
484                IntegerKeyword::PAR(_) => {
485                    // Use actual parameter count
486                    n_params.to_string()
487                }
488                _ => int_kw.to_string(),
489            },
490            Keyword::String(str_kw) => str_kw.to_string(),
491            Keyword::Float(float_kw) => float_kw.to_string(),
492            Keyword::Byte(byte_kw) => byte_kw.to_string(),
493            Keyword::Mixed(mixed_kw) => mixed_kw.to_string(),
494        };
495
496        // Remove $ prefix for serialization (it will be added back)
497        let key_without_prefix = key.strip_prefix('$').unwrap_or(key);
498        add_keyword(key_without_prefix, &value_str);
499    }
500
501    Ok(text_segment)
502}
503
504fn serialize_data(df: &DataFrame, metadata: &Metadata) -> Result<Vec<u8>> {
505    let n_events = df.height();
506    let n_params = df.width();
507
508    // Get bytes per parameter from metadata
509    let bytes_per_param = metadata
510        .calculate_bytes_per_event()
511        .map(|bytes_per_event| bytes_per_event / n_params)
512        .unwrap_or(4); // Default to 4 bytes (float32)
513
514    let mut data = Vec::with_capacity(n_events * n_params * bytes_per_param);
515
516    // Get byte order
517    let byte_order = metadata
518        .get_byte_order()
519        .unwrap_or(&ByteOrder::LittleEndian);
520    let is_little_endian = matches!(byte_order, ByteOrder::LittleEndian);
521
522    // Serialize row by row (FCS format: event1_param1, event1_param2, ..., event2_param1, ...)
523    // Get all columns as f32 slices for efficient access
524    let column_names = df.get_column_names();
525    let mut column_data: Vec<&[f32]> = Vec::with_capacity(n_params);
526
527    for col_name in &column_names {
528        let series = df.column(col_name)?;
529        let f32_series = series
530            .f32()
531            .map_err(|e| anyhow!("Column {} is not f32: {}", col_name, e))?;
532        let slice = f32_series
533            .cont_slice()
534            .map_err(|e| anyhow!("Column {} data is not contiguous: {}", col_name, e))?;
535        column_data.push(slice);
536    }
537
538    // Write row by row
539    for row_idx in 0..n_events {
540        for col_data in &column_data {
541            let value = col_data[row_idx];
542
543            // Write as float32 (4 bytes)
544            if is_little_endian {
545                data.write_f32::<LittleEndian>(value)?;
546            } else {
547                use byteorder::BigEndian;
548                data.write_f32::<BigEndian>(value)?;
549            }
550        }
551    }
552
553    Ok(data)
554}
555
556fn build_header(
557    version: &Version,
558    text_start: usize,
559    text_end: usize,
560    data_start: usize,
561    data_end: usize,
562) -> Result<Vec<u8>> {
563    let mut header = vec![0u8; 58];
564
565    // Version string (bytes 0-5)
566    let version_str = format!("{}", version);
567    if version_str.len() > 6 {
568        return Err(anyhow!("Version string too long: {}", version_str));
569    }
570    header[0..version_str.len()].copy_from_slice(version_str.as_bytes());
571
572    // 4 spaces (bytes 6-9)
573    header[6..10].fill(b' ');
574
575    // Text segment offsets (bytes 10-17 and 18-25) - right-aligned, space-padded
576    let text_start_str = format!("{:>8}", text_start);
577    header[10..18].copy_from_slice(text_start_str.as_bytes());
578    let text_end_str = format!("{:>8}", text_end);
579    header[18..26].copy_from_slice(text_end_str.as_bytes());
580
581    // Data segment offsets (bytes 26-33 and 34-41)
582    let data_start_str = format!("{:>8}", data_start);
583    header[26..34].copy_from_slice(data_start_str.as_bytes());
584    let data_end_str = format!("{:>8}", data_end);
585    header[34..42].copy_from_slice(data_end_str.as_bytes());
586
587    // Analysis segment offsets (bytes 42-49 and 50-57) - set to 0
588    header[42..50].copy_from_slice(b"       0");
589    header[50..58].copy_from_slice(b"       0");
590
591    Ok(header)
592}