Skip to main content

flow_fcs/
write.rs

1//! FCS file writing utilities
2//!
3//! This module provides functionality to write FCS files to disk, including:
4//! - Duplicating existing files
5//! - Editing metadata and persisting changes
6//! - Creating new FCS files with data modifications (filtering, concatenation, column addition)
7//!
8//! ## Memory-Mapping Implications
9//!
10//! **Important**: When writing FCS files, the original memory-mapped file is not modified.
11//! All write operations create new files. The original `Fcs` struct remains valid and
12//! can continue to access the original file via memory-mapping until it's dropped.
13//!
14//! When you call `write_fcs_file()` or any of the modification functions:
15//! 1. The data is read from the DataFrame (which is already in memory)
16//! 2. A new file is created on disk
17//! 3. The original memory-mapped file remains unchanged
18//!
19//! This means:
20//! - You can safely write modified versions without affecting the original
21//! - The original `Fcs` struct can still be used after writing
22//! - No special handling is needed to "close" or "unmap" before writing
23//! - Multiple writes can happen concurrently from the same source file
24
25use crate::{
26    Fcs,
27    byteorder::ByteOrder,
28    keyword::{ByteKeyword, IntegerKeyword, Keyword, StringableKeyword},
29    metadata::Metadata,
30    version::Version,
31};
32use anyhow::{Result, anyhow};
33use byteorder::{LittleEndian, WriteBytesExt};
34use polars::prelude::*;
35use std::fs::File;
36use std::io::Write;
37use std::path::Path;
38use std::sync::Arc;
39
40/// Write an FCS file to disk
41///
42/// **Important**: This function closes the memory-mapped file before writing.
43/// The original Fcs struct will no longer be able to access the original file
44/// after this operation, but the data is preserved in the DataFrame.
45///
46/// # Arguments
47/// * `fcs` - The FCS struct to write (will consume the struct)
48/// * `path` - Output file path
49///
50/// # Errors
51/// Returns an error if:
52/// - The path is invalid
53/// - The file cannot be written
54/// - Metadata cannot be serialized
55pub fn write_fcs_file(fcs: Fcs, path: impl AsRef<Path>) -> Result<()> {
56    let path = path.as_ref();
57
58    // Validate file extension
59    if path.extension().and_then(|s| s.to_str()) != Some("fcs") {
60        return Err(anyhow!("Output file must have .fcs extension"));
61    }
62
63    // Get data from DataFrame
64    let df = &*fcs.data_frame;
65    let n_events = df.height();
66    let n_params = df.width();
67
68    if n_events == 0 {
69        return Err(anyhow!("Cannot write FCS file with 0 events"));
70    }
71    if n_params == 0 {
72        return Err(anyhow!("Cannot write FCS file with 0 parameters"));
73    }
74
75    // Serialize data segment first (we need its size for metadata)
76    let data_segment = serialize_data(df, &fcs.metadata)?;
77
78    // Calculate offsets
79    let header_size = 58;
80    let text_start = header_size;
81    // Estimate text segment size (will recalculate after)
82    let estimated_text_size = estimate_text_segment_size(&fcs.metadata, n_events, n_params);
83    let estimated_text_end = text_start + estimated_text_size - 1;
84    let data_start = estimated_text_end + 1;
85    let data_end = data_start + data_segment.len() - 1;
86
87    // Serialize metadata to text segment (now we know data offsets)
88    let text_segment = serialize_metadata(&fcs.metadata, n_events, n_params, data_start, data_end)?;
89
90    // Recalculate offsets with actual text segment size
91    let text_end = text_start + text_segment.len() - 1;
92    let data_start = text_end + 1;
93    let data_end = data_start + data_segment.len() - 1;
94
95    // Build header
96    let header = build_header(
97        &fcs.header.version,
98        text_start,
99        text_end,
100        data_start,
101        data_end,
102    )?;
103
104    // Write file
105    let mut file = File::create(path)?;
106    file.write_all(&header)?;
107    file.write_all(&text_segment)?;
108    file.write_all(&data_segment)?;
109    file.sync_all()?;
110
111    Ok(())
112}
113
114/// Duplicate an existing FCS file to a new path
115///
116/// This creates an exact copy of the file on disk. The original Fcs struct
117/// remains valid and can continue to be used.
118///
119/// # Arguments
120/// * `fcs` - Reference to the FCS struct to duplicate
121/// * `path` - Output file path
122///
123/// # Errors
124/// Returns an error if the file cannot be written
125pub fn duplicate_fcs_file(fcs: &Fcs, path: impl AsRef<Path>) -> Result<()> {
126    use std::fs;
127
128    let path = path.as_ref();
129
130    // Simply copy the file on disk
131    fs::copy(&fcs.file_access.path, path)?;
132
133    Ok(())
134}
135
136/// Edit metadata and persist changes to disk
137///
138/// This function:
139/// 1. Updates the metadata in the Fcs struct
140/// 2. Writes the modified file to disk
141/// 3. Returns a new Fcs struct pointing to the new file
142///
143/// **Note**: The original file is not modified. A new file is created.
144///
145/// # Arguments
146/// * `fcs` - The FCS struct to modify
147/// * `path` - Output file path for the modified file
148/// * `updates` - Function that modifies the metadata
149///
150/// # Errors
151/// Returns an error if the file cannot be written
152pub fn edit_metadata_and_save<F>(mut fcs: Fcs, path: impl AsRef<Path>, updates: F) -> Result<Fcs>
153where
154    F: FnOnce(&mut Metadata),
155{
156    // Apply updates to metadata
157    updates(&mut fcs.metadata);
158
159    // Update $TOT if event count changed
160    let n_events = fcs.get_event_count_from_dataframe();
161    use crate::keyword::match_and_parse_keyword;
162    let tot_keyword = match_and_parse_keyword("$TOT", &n_events.to_string());
163    if let crate::keyword::KeywordCreationResult::Int(int_kw) = tot_keyword {
164        fcs.metadata
165            .keywords
166            .insert("$TOT".to_string(), Keyword::Int(int_kw));
167    }
168
169    // Write to new file
170    write_fcs_file(fcs.clone(), &path)?;
171
172    // Open the new file
173    Fcs::open(
174        path.as_ref()
175            .to_str()
176            .ok_or_else(|| anyhow!("Invalid path"))?,
177    )
178}
179
180/// Create a new FCS file by filtering events
181///
182/// Removes events where `mask[i] == false`. The mask must have the same length
183/// as the number of events in the original file.
184///
185/// # Arguments
186/// * `fcs` - The FCS struct to filter
187/// * `path` - Output file path
188/// * `mask` - Boolean mask (true = keep, false = remove)
189///
190/// # Errors
191/// Returns an error if:
192/// - The mask length doesn't match the number of events
193/// - The file cannot be written
194pub fn filter_events(fcs: Fcs, path: impl AsRef<Path>, mask: &[bool]) -> Result<Fcs> {
195    let df = &*fcs.data_frame;
196    let n_events = df.height();
197
198    if mask.len() != n_events {
199        return Err(anyhow!(
200            "Mask length {} doesn't match number of events {}",
201            mask.len(),
202            n_events
203        ));
204    }
205
206    // Filter DataFrame using Polars
207    let mask_vec: Vec<bool> = mask.to_vec();
208    let mask_series = Series::new("mask".into(), mask_vec);
209    let mask_ca = mask_series.bool()?;
210    let filtered_df = df.filter(&mask_ca)?;
211
212    // Create new Fcs with filtered data
213    let mut new_fcs = fcs.clone();
214    new_fcs.data_frame = Arc::new(filtered_df);
215
216    // Update metadata
217    let n_events_after = new_fcs.get_event_count_from_dataframe();
218    use crate::keyword::match_and_parse_keyword;
219    let tot_keyword = match_and_parse_keyword("$TOT", &n_events_after.to_string());
220    if let crate::keyword::KeywordCreationResult::Int(int_kw) = tot_keyword {
221        new_fcs
222            .metadata
223            .keywords
224            .insert("$TOT".to_string(), Keyword::Int(int_kw));
225    }
226
227    // Write to file
228    write_fcs_file(new_fcs.clone(), &path)?;
229
230    // Open the new file
231    Fcs::open(
232        path.as_ref()
233            .to_str()
234            .ok_or_else(|| anyhow!("Invalid path"))?,
235    )
236}
237
238/// Create a new FCS file by concatenating events from multiple files
239///
240/// All files must have the same parameters (same names and order).
241///
242/// # Arguments
243/// * `files` - Vector of FCS structs to concatenate
244/// * `path` - Output file path
245///
246/// # Errors
247/// Returns an error if:
248/// - Files have different parameters
249/// - The file cannot be written
250pub fn concatenate_events(files: Vec<Fcs>, path: impl AsRef<Path>) -> Result<Fcs> {
251    if files.is_empty() {
252        return Err(anyhow!("Cannot concatenate empty list of files"));
253    }
254
255    if files.len() == 1 {
256        // Just duplicate the single file
257        return duplicate_fcs_file(&files[0], &path).and_then(|_| {
258            Fcs::open(
259                path.as_ref()
260                    .to_str()
261                    .ok_or_else(|| anyhow!("Invalid path"))?,
262            )
263        });
264    }
265
266    // Verify all files have the same parameters
267    let first_params: Vec<String> = files[0].get_parameter_names_from_dataframe();
268
269    for (idx, fcs) in files.iter().enumerate().skip(1) {
270        let params: Vec<String> = fcs.get_parameter_names_from_dataframe();
271        if params != first_params {
272            return Err(anyhow!("File {} has different parameters than file 0", idx));
273        }
274    }
275
276    // Concatenate DataFrames using vstack
277    let dfs: Vec<DataFrame> = files.iter().map(|f| (*f.data_frame).clone()).collect();
278    let concatenated_df = dfs
279        .into_iter()
280        .reduce(|acc, df| acc.vstack(&df).unwrap_or(acc))
281        .ok_or_else(|| anyhow!("No files to concatenate"))?;
282
283    // Create new Fcs using first file as template
284    let mut new_fcs = files[0].clone();
285    new_fcs.data_frame = Arc::new(concatenated_df);
286
287    // Update metadata
288    let n_events_after = new_fcs.get_event_count_from_dataframe();
289    use crate::keyword::match_and_parse_keyword;
290    let tot_keyword = match_and_parse_keyword("$TOT", &n_events_after.to_string());
291    if let crate::keyword::KeywordCreationResult::Int(int_kw) = tot_keyword {
292        new_fcs
293            .metadata
294            .keywords
295            .insert("$TOT".to_string(), Keyword::Int(int_kw));
296    }
297
298    // Generate new GUID
299    new_fcs.metadata.validate_guid();
300
301    // Write to file
302    write_fcs_file(new_fcs.clone(), &path)?;
303
304    // Open the new file
305    Fcs::open(
306        path.as_ref()
307            .to_str()
308            .ok_or_else(|| anyhow!("Invalid path"))?,
309    )
310}
311
312/// Create a new FCS file by adding a column (parameter) to existing data
313///
314/// This is useful for adding QC results (e.g., a boolean column indicating
315/// good/bad events) or other event-level annotations.
316///
317/// # Arguments
318/// * `fcs` - The FCS struct to modify
319/// * `path` - Output file path
320/// * `column_name` - Name of the new parameter
321/// * `values` - Values for the new parameter (must match number of events)
322///
323/// # Errors
324/// Returns an error if:
325/// - The values length doesn't match the number of events
326/// - The column name already exists
327/// - The file cannot be written
328pub fn add_column(
329    mut fcs: Fcs,
330    path: impl AsRef<Path>,
331    column_name: &str,
332    values: Vec<f32>,
333) -> Result<Fcs> {
334    let df = &*fcs.data_frame;
335    let n_events = df.height();
336
337    if values.len() != n_events {
338        return Err(anyhow!(
339            "Values length {} doesn't match number of events {}",
340            values.len(),
341            n_events
342        ));
343    }
344
345    // Check if column already exists
346    if df
347        .get_column_names()
348        .iter()
349        .any(|&name| name == column_name)
350    {
351        return Err(anyhow!("Column {} already exists", column_name));
352    }
353
354    // Add column to DataFrame
355    let mut new_df = df.clone();
356    let new_series = Series::new(column_name.into(), values);
357    new_df
358        .with_column(new_series.into())
359        .map_err(|e| anyhow!("Failed to add column: {}", e))?;
360
361    // Update Fcs struct
362    fcs.data_frame = Arc::new(new_df);
363
364    // Add parameter metadata
365    let n_params = fcs.get_parameter_count_from_dataframe();
366    let param_num = n_params; // 1-based indexing in FCS
367
368    // Update $PAR keyword
369    use crate::keyword::match_and_parse_keyword;
370    let par_keyword = match_and_parse_keyword("$PAR", &n_params.to_string());
371    if let crate::keyword::KeywordCreationResult::Int(int_kw) = par_keyword {
372        fcs.metadata
373            .keywords
374            .insert("$PAR".to_string(), Keyword::Int(int_kw));
375    }
376
377    // Add parameter keywords ($PnN, $PnB, etc.)
378    fcs.metadata
379        .insert_string_keyword(format!("$P{}N", param_num), column_name.to_string());
380
381    // Default: 32 bits (4 bytes) for float32
382    let pnb_keyword = match_and_parse_keyword(&format!("$P{}B", param_num), "32");
383    if let crate::keyword::KeywordCreationResult::Int(int_kw) = pnb_keyword {
384        fcs.metadata
385            .keywords
386            .insert(format!("$P{}B", param_num), Keyword::Int(int_kw));
387    }
388
389    // Default range
390    let pnr_keyword = match_and_parse_keyword(&format!("$P{}R", param_num), "262144");
391    if let crate::keyword::KeywordCreationResult::Int(int_kw) = pnr_keyword {
392        fcs.metadata
393            .keywords
394            .insert(format!("$P{}R", param_num), Keyword::Int(int_kw));
395    }
396
397    // Default amplification
398    fcs.metadata
399        .insert_string_keyword(format!("$P{}E", param_num), "0,0".to_string());
400
401    // Add to parameter map
402    use crate::TransformType;
403    use crate::parameter::Parameter;
404    fcs.parameters.insert(
405        column_name.to_string().into(),
406        Parameter::new(&param_num, column_name, column_name, &TransformType::Linear),
407    );
408
409    // Write to file
410    write_fcs_file(fcs.clone(), &path)?;
411
412    // Open the new file
413    Fcs::open(
414        path.as_ref()
415            .to_str()
416            .ok_or_else(|| anyhow!("Invalid path"))?,
417    )
418}
419
420// ==================== Internal Helper Functions ====================
421
422fn estimate_text_segment_size(metadata: &Metadata, _n_events: usize, n_params: usize) -> usize {
423    // Rough estimate: base size + keywords
424    let base_size = 200; // Base keywords
425    let keyword_size = metadata.keywords.len() * 50; // Average keyword size
426    let param_keywords = n_params * 100; // Parameter keywords
427    base_size + keyword_size + param_keywords
428}
429
430fn serialize_metadata(
431    metadata: &Metadata,
432    n_events: usize,
433    n_params: usize,
434    data_start: usize,
435    data_end: usize,
436) -> Result<Vec<u8>> {
437    let delimiter = metadata.delimiter as u8;
438    let mut text_segment = Vec::new();
439
440    // Helper to add keyword-value pair
441    let mut add_keyword = |key: &str, value: &str| {
442        text_segment.push(delimiter);
443        text_segment.extend_from_slice(format!("${}", key).as_bytes());
444        text_segment.push(delimiter);
445        text_segment.extend_from_slice(value.as_bytes());
446    };
447
448    // Required keywords (order matters for FCS compatibility)
449    // Write these first, then metadata keywords will be added (some may overwrite these)
450    add_keyword("BEGINANALYSIS", "0");
451    add_keyword("ENDANALYSIS", "0");
452    add_keyword("BEGINSTEXT", "0");
453    add_keyword("ENDSTEXT", "0");
454    add_keyword("BEGINDATA", &data_start.to_string());
455    add_keyword("ENDDATA", &data_end.to_string());
456
457    // Ensure required keywords are written (use metadata values if present, otherwise defaults)
458    let byteord_value = metadata
459        .keywords
460        .get("$BYTEORD")
461        .and_then(|k| match k {
462            Keyword::Byte(ByteKeyword::BYTEORD(bo)) => Some(bo.to_keyword_str()),
463            _ => None,
464        })
465        .unwrap_or("1,2,3,4");
466    add_keyword("BYTEORD", byteord_value);
467
468    let datatype_value = metadata
469        .keywords
470        .get("$DATATYPE")
471        .and_then(|k| match k {
472            Keyword::Byte(ByteKeyword::DATATYPE(dt)) => Some(dt.to_keyword_str()),
473            _ => None,
474        })
475        .unwrap_or("F");
476    add_keyword("DATATYPE", datatype_value);
477
478    let mode_value = metadata
479        .keywords
480        .get("$MODE")
481        .and_then(|k| match k {
482            Keyword::String(sk) => Some(sk.get_str().to_string()),
483            _ => None,
484        })
485        .unwrap_or_else(|| "L".to_string());
486    add_keyword("MODE", &mode_value);
487
488    add_keyword("PAR", &n_params.to_string());
489    add_keyword("TOT", &n_events.to_string());
490
491    let nextdata_value = metadata
492        .keywords
493        .get("$NEXTDATA")
494        .and_then(|k| match k {
495            Keyword::String(sk) => Some(sk.get_str().to_string()),
496            _ => None,
497        })
498        .unwrap_or_else(|| "0".to_string());
499    add_keyword("NEXTDATA", &nextdata_value);
500
501    // Serialize all other keywords from metadata
502    let mut sorted_keys: Vec<_> = metadata.keywords.keys().collect();
503    sorted_keys.sort();
504
505    for key in sorted_keys {
506        // Skip keywords we've already written
507        if matches!(
508            key.as_str(),
509            "$BEGINANALYSIS"
510                | "$ENDANALYSIS"
511                | "$BEGINSTEXT"
512                | "$ENDSTEXT"
513                | "$BEGINDATA"
514                | "$ENDDATA"
515                | "$BYTEORD"
516                | "$DATATYPE"
517                | "$MODE"
518                | "$PAR"
519                | "$TOT"
520                | "$NEXTDATA"
521        ) {
522            continue;
523        }
524
525        let keyword = metadata
526            .keywords
527            .get(key)
528            .ok_or_else(|| anyhow!("Keyword '{}' not found in metadata", key))?;
529        let value_str = match keyword {
530            Keyword::Int(int_kw) => match int_kw {
531                IntegerKeyword::TOT(_) => {
532                    // Use actual event count
533                    n_events.to_string()
534                }
535                IntegerKeyword::PAR(_) => {
536                    // Use actual parameter count
537                    n_params.to_string()
538                }
539                _ => int_kw.get_str().to_string(),
540            },
541            Keyword::String(str_kw) => str_kw.get_str().to_string(),
542            Keyword::Float(float_kw) => float_kw.to_string(),
543            Keyword::Byte(byte_kw) => byte_kw.get_str().to_string(),
544            Keyword::Mixed(mixed_kw) => {
545                // Serialize mixed keywords in FCS format (not Debug format)
546                use crate::keyword::MixedKeyword;
547                match mixed_kw {
548                    MixedKeyword::PnE(f1, f2) => format!("{},{}", f1, f2),
549                    MixedKeyword::PnL(wavelengths) => {
550                        format!("({})", wavelengths.iter().map(|w| w.to_string()).collect::<Vec<_>>().join(","))
551                    }
552                    MixedKeyword::PnD(scale_type, lower, upper) => {
553                        format!("({},{},{})", scale_type, lower, upper)
554                    }
555                    MixedKeyword::PnCalibration(f1, s) => {
556                        format!("{}/{}", f1, s)
557                    }
558                    MixedKeyword::RnW(widths) => {
559                        format!("({})", widths.iter().map(|w| w.to_string()).collect::<Vec<_>>().join(","))
560                    }
561                    MixedKeyword::SPILLOVER { n_parameters, parameter_names, matrix_values } => {
562                        let mut result = format!("{}", n_parameters);
563                        for name in parameter_names {
564                            result.push(',');
565                            result.push_str(name);
566                        }
567                        for val in matrix_values {
568                            result.push(',');
569                            result.push_str(&val.to_string());
570                        }
571                        result
572                    }
573                    MixedKeyword::GnE(f1, f2) => format!("{},{}", f1, f2),
574                }
575            },
576        };
577
578        // Remove $ prefix for serialization (it will be added back)
579        let key_without_prefix = key.strip_prefix('$').unwrap_or(key);
580        add_keyword(key_without_prefix, &value_str);
581    }
582
583    // Add trailing delimiter after the last value to properly terminate the text segment
584    // The parser expects the text segment to end with a delimiter after the last value
585    text_segment.push(delimiter);
586
587    Ok(text_segment)
588}
589
590fn serialize_data(df: &DataFrame, metadata: &Metadata) -> Result<Vec<u8>> {
591    let n_events = df.height();
592    let n_params = df.width();
593
594    // Get bytes per parameter from metadata
595    let bytes_per_param = metadata
596        .calculate_bytes_per_event()
597        .map(|bytes_per_event| bytes_per_event / n_params)
598        .unwrap_or(4); // Default to 4 bytes (float32)
599
600    let mut data = Vec::with_capacity(n_events * n_params * bytes_per_param);
601
602    // Get byte order
603    let byte_order = metadata
604        .get_byte_order()
605        .unwrap_or(&ByteOrder::LittleEndian);
606    let is_little_endian = matches!(byte_order, ByteOrder::LittleEndian);
607
608    // Serialize row by row (FCS format: event1_param1, event1_param2, ..., event2_param1, ...)
609    // Get all columns as f32 slices for efficient access
610    let column_names = df.get_column_names();
611    let mut column_data: Vec<&[f32]> = Vec::with_capacity(n_params);
612
613    for col_name in &column_names {
614        let series = df.column(col_name)?;
615        let f32_series = series
616            .f32()
617            .map_err(|e| anyhow!("Column {} is not f32: {}", col_name, e))?;
618        let slice = f32_series
619            .cont_slice()
620            .map_err(|e| anyhow!("Column {} data is not contiguous: {}", col_name, e))?;
621        column_data.push(slice);
622    }
623
624    // Write row by row
625    for row_idx in 0..n_events {
626        for col_data in &column_data {
627            let value = col_data[row_idx];
628
629            // Write as float32 (4 bytes)
630            if is_little_endian {
631                data.write_f32::<LittleEndian>(value)?;
632            } else {
633                use byteorder::BigEndian;
634                data.write_f32::<BigEndian>(value)?;
635            }
636        }
637    }
638
639    Ok(data)
640}
641
642fn build_header(
643    version: &Version,
644    text_start: usize,
645    text_end: usize,
646    data_start: usize,
647    data_end: usize,
648) -> Result<Vec<u8>> {
649    let mut header = vec![0u8; 58];
650
651    // Version string (bytes 0-5)
652    let version_str = format!("{}", version);
653    if version_str.len() > 6 {
654        return Err(anyhow!("Version string too long: {}", version_str));
655    }
656    header[0..version_str.len()].copy_from_slice(version_str.as_bytes());
657
658    // 4 spaces (bytes 6-9)
659    header[6..10].fill(b' ');
660
661    // Text segment offsets (bytes 10-17 and 18-25) - right-aligned, space-padded
662    let text_start_str = format!("{:>8}", text_start);
663    header[10..18].copy_from_slice(text_start_str.as_bytes());
664    let text_end_str = format!("{:>8}", text_end);
665    header[18..26].copy_from_slice(text_end_str.as_bytes());
666
667    // Data segment offsets (bytes 26-33 and 34-41)
668    let data_start_str = format!("{:>8}", data_start);
669    header[26..34].copy_from_slice(data_start_str.as_bytes());
670    let data_end_str = format!("{:>8}", data_end);
671    header[34..42].copy_from_slice(data_end_str.as_bytes());
672
673    // Analysis segment offsets (bytes 42-49 and 50-57) - set to 0
674    header[42..50].copy_from_slice(b"       0");
675    header[50..58].copy_from_slice(b"       0");
676
677    Ok(header)
678}