flow_fcs/
file.rs

1// Internal crate imports
2use crate::{
3    TransformType,
4    byteorder::ByteOrder,
5    header::Header,
6    keyword::{IntegerableKeyword, StringableKeyword},
7    metadata::Metadata,
8    parameter::{EventDataFrame, EventDatum, Parameter, ParameterBuilder, ParameterMap},
9};
10use rayon::iter::IntoParallelRefIterator;
11// Standard library imports
12use ndarray_linalg::Inverse;
13use std::borrow::Cow;
14use std::fs::File;
15use std::ops::Deref;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18
19// External crate imports
20use anyhow::{Result, anyhow};
21use itertools::{Itertools, MinMaxResult};
22use memmap3::{Mmap, MmapOptions};
23use ndarray::Array2;
24use polars::prelude::*;
25use rayon::iter::ParallelIterator;
26use rayon::slice::ParallelSlice;
27
28/// A shareable wrapper around the file path and memory-map
29///
30/// Uses Arc<Mmap> to share the memory mapping across clones without creating
31/// new file descriptors or memory mappings. This is more efficient than cloning
32/// the underlying file descriptor and re-mapping.
33#[derive(Debug, Clone)]
34pub struct AccessWrapper {
35    /// An owned, mutable path to the file on disk
36    pub path: PathBuf,
37    /// The memory-mapped file, shared via Arc for efficient cloning
38    ///
39    /// # Safety
40    /// The Mmap is created from a File handle and remains valid as long as:
41    /// 1. The file is not truncated while mapped
42    /// 2. The file contents are not modified while mapped (we only read)
43    /// 3. The Mmap is not accessed after the file is deleted
44    ///
45    /// Our usage satisfies these invariants because:
46    /// - FCS files are read-only once opened (we never write back to them)
47    /// - The file remains open (via File handle) for the lifetime of the Mmap
48    /// - We only drop the Mmap when the FCS file is no longer needed
49    pub mmap: Arc<Mmap>,
50}
51
52impl AccessWrapper {
53    /// Creates a new `AccessWrapper` from a file path
54    /// # Errors
55    /// Will return `Err` if:
56    /// - the file cannot be opened
57    /// - the file cannot be memory-mapped
58    pub fn new(path: &str) -> Result<Self> {
59        let file = File::open(path)?;
60        let path = PathBuf::from(path);
61
62        // memmap3 provides better safety guarantees than memmap2, though OS-level
63        // memory mapping still requires unsafe at creation time. The resulting Mmap
64        // is safe to use and provides better guarantees than memmap2.
65        let mmap = unsafe { MmapOptions::new().map(&file)? };
66
67        Ok(Self {
68            path,
69            mmap: Arc::new(mmap),
70        })
71    }
72}
73
74impl Deref for AccessWrapper {
75    type Target = Mmap;
76
77    fn deref(&self) -> &Self::Target {
78        &self.mmap
79    }
80}
81
82/// A struct representing an FCS file
83#[derive(Debug, Clone)]
84pub struct Fcs {
85    /// The header segment of the fcs file, including the version, and byte offsets to the text, data, and analysis segments
86    pub header: Header,
87    /// The metadata segment of the fcs file, including the delimiter, and a hashmap of keyword/value pairs
88    pub metadata: Metadata,
89    /// A hashmap of the parameter names and their associated metadata
90    pub parameters: ParameterMap,
91
92    /// Event data stored in columnar format via Polars DataFrame (NEW)
93    /// Each column represents one parameter (e.g., FSC-A, SSC-A, FL1-A)
94    /// Polars provides:
95    /// - Zero-copy column access
96    /// - Built-in SIMD operations
97    /// - Lazy evaluation for complex queries
98    /// - Apache Arrow interop
99    /// This is the primary data format going forward
100    pub data_frame: EventDataFrame,
101
102    /// A wrapper around the file, path, and memory-map
103    pub file_access: AccessWrapper,
104}
105
106impl Fcs {
107    /// Creates a new Fcs file struct
108    /// # Errors
109    /// Will return `Err` if:
110    /// - the file cannot be opened,
111    /// - the file extension is not `fcs`,
112    /// - the TEXT segment cannot be validated,
113    /// - the raw data cannot be read,
114    /// - the parameter names and labels cannot be generated
115    pub fn new() -> Result<Self> {
116        Ok(Self {
117            header: Header::new(),
118            metadata: Metadata::new(),
119            parameters: ParameterMap::default(),
120            data_frame: Arc::new(DataFrame::empty()),
121            file_access: AccessWrapper::new("")?,
122        })
123    }
124
125    /// Opens and parses an FCS file from the given path
126    ///
127    /// This is the primary entry point for reading FCS files. It:
128    /// - Validates the file extension (must be `.fcs`)
129    /// - Memory-maps the file for efficient access
130    /// - Parses the header segment to determine FCS version and segment offsets
131    /// - Parses the text segment to extract metadata and keywords
132    /// - Validates required keywords for the FCS version
133    /// - Generates a GUID if one is not present
134    /// - Loads event data into a Polars DataFrame for efficient columnar access
135    ///
136    /// # Arguments
137    /// * `path` - Path to the FCS file (must have `.fcs` extension)
138    ///
139    /// # Errors
140    /// Will return `Err` if:
141    /// - the file cannot be opened or memory-mapped
142    /// - the file extension is not `.fcs`
143    /// - the FCS version is invalid or unsupported
144    /// - required keywords are missing for the FCS version
145    /// - the data segment cannot be read or parsed
146    /// - parameter metadata cannot be generated
147    ///
148    /// # Example
149    /// ```no_run
150    /// use flow_fcs::Fcs;
151    ///
152    /// let fcs = Fcs::open("data/sample.fcs")?;
153    /// println!("File has {} events", fcs.get_number_of_events()?);
154    /// # Ok::<(), anyhow::Error>(())
155    /// ```
156    pub fn open(path: &str) -> Result<Self> {
157        // Attempt to open the file path
158        let file_access = AccessWrapper::new(path).expect("Should be able make new access wrapper");
159
160        // Validate the file extension
161        Self::validate_fcs_extension(&file_access.path)
162            .expect("Should have a valid file extension");
163
164        // Create header and metadata structs from a memory map of the file
165        let header = Header::from_mmap(&file_access.mmap)
166            .expect("Should be able to create header from mmap");
167        let mut metadata = Metadata::from_mmap(&file_access.mmap, &header);
168
169        metadata
170            .validate_text_segment_keywords(&header)
171            .expect("Should have valid text segment keywords");
172        // metadata.validate_number_of_parameters()?;
173        metadata.validate_guid();
174
175        Ok(Self {
176            parameters: Self::generate_parameter_map(&metadata)
177                .expect("Should be able to generate parameter map"),
178            data_frame: Self::store_raw_data_as_dataframe(&header, &file_access.mmap, &metadata)
179                .expect("Should be able to store raw data as DataFrame"),
180            file_access,
181            header,
182            metadata,
183        })
184    }
185
186    /// Validates that the file extension is `.fcs`
187    /// # Errors
188    /// Will return `Err` if the file extension is not `.fcs`
189    fn validate_fcs_extension(path: &Path) -> Result<()> {
190        let extension = path
191            .extension()
192            .ok_or_else(|| anyhow!("File has no extension"))?
193            .to_str()
194            .ok_or_else(|| anyhow!("File extension is not valid UTF-8"))?;
195
196        if extension.to_lowercase() != "fcs" {
197            return Err(anyhow!("Invalid file extension: {}", extension));
198        }
199
200        Ok(())
201    }
202
203    /// Reads raw data from FCS file and stores it as a Polars DataFrame
204    /// Returns columnar data optimized for parameter-wise access patterns
205    ///
206    /// This function provides significant performance benefits over ndarray:
207    /// - 2-5x faster data filtering and transformations
208    /// - Native columnar storage (optimal for FCS parameter access patterns)
209    /// - Zero-copy operations via Apache Arrow
210    /// - Built-in SIMD acceleration
211    ///
212    /// # Errors
213    /// Will return `Err` if:
214    /// - The data cannot be read
215    /// - The data cannot be converted to f32 values
216    /// - The DataFrame cannot be constructed
217    fn store_raw_data_as_dataframe(
218        header: &Header,
219        mmap: &Mmap,
220        metadata: &Metadata,
221    ) -> Result<EventDataFrame> {
222        // Validate data offset bounds before accessing mmap
223        let mut data_start = *header.data_offset.start();
224        let mut data_end = *header.data_offset.end();
225        let mmap_len = mmap.len();
226
227        // Handle zero offsets by checking keywords
228        if data_start == 0 {
229            if let Ok(begin_data) = metadata.get_integer_keyword("$BEGINDATA") {
230                data_start = begin_data.get_usize().clone();
231            } else {
232                return Err(anyhow!(
233                    "$BEGINDATA keyword not found. Unable to determine data start."
234                ));
235            }
236        }
237
238        if data_end == 0 {
239            if let Ok(end_data) = metadata.get_integer_keyword("$ENDDATA") {
240                data_end = end_data.get_usize().clone();
241            } else {
242                return Err(anyhow!(
243                    "$ENDDATA keyword not found. Unable to determine data end."
244                ));
245            }
246        }
247
248        // Validate offsets
249        if data_start >= mmap_len {
250            return Err(anyhow!(
251                "Data start offset {} is beyond mmap length {}",
252                data_start,
253                mmap_len
254            ));
255        }
256
257        if data_end >= mmap_len {
258            return Err(anyhow!(
259                "Data end offset {} is beyond mmap length {}",
260                data_end,
261                mmap_len
262            ));
263        }
264
265        if data_start > data_end {
266            return Err(anyhow!(
267                "Data start offset {} is greater than end offset {}",
268                data_start,
269                data_end
270            ));
271        }
272
273        // Extract data bytes
274        let data_bytes = &mmap[data_start..=data_end];
275
276        let number_of_parameters = metadata
277            .get_number_of_parameters()
278            .expect("Should be able to retrieve the number of parameters");
279        let number_of_events = metadata
280            .get_number_of_events()
281            .expect("Should be able to retrieve the number of events");
282        let bytes_per_event = metadata
283            .get_data_type()
284            .expect("Should be able to get the data type")
285            .get_bytes_per_event();
286        let byte_order = metadata
287            .get_byte_order()
288            .expect("Should be able to get the byte order");
289
290        // Validate data size
291        let expected_total_bytes = number_of_events * number_of_parameters * bytes_per_event;
292        if data_bytes.len() < expected_total_bytes {
293            return Err(anyhow!(
294                "Insufficient data: expected {} bytes, but only have {} bytes",
295                expected_total_bytes,
296                data_bytes.len()
297            ));
298        }
299
300        // Confirm that data_bytes is a multiple of 4 bytes
301        if data_bytes.len() % 4 != 0 {
302            return Err(anyhow!(
303                "Data bytes length {} is not a multiple of 4",
304                data_bytes.len()
305            ));
306        }
307
308        // Parse f32 values using the same optimized approach as store_raw_data
309        let f32_values: Vec<f32> = {
310            let needs_swap = match (byte_order, cfg!(target_endian = "little")) {
311                (ByteOrder::LittleEndian, true) | (ByteOrder::BigEndian, false) => false,
312                _ => true,
313            };
314
315            match bytemuck::try_cast_slice::<u8, f32>(data_bytes) {
316                Ok(f32_slice) => {
317                    #[cfg(debug_assertions)]
318                    eprintln!(
319                        "✓ Zero-copy path (DataFrame): aligned data ({} bytes, {} f32s)",
320                        data_bytes.len(),
321                        f32_slice.len()
322                    );
323
324                    if needs_swap {
325                        f32_slice
326                            .par_iter()
327                            .map(|&f| f32::from_bits(f.to_bits().swap_bytes()))
328                            .collect()
329                    } else {
330                        f32_slice.to_vec()
331                    }
332                }
333                Err(_) => {
334                    #[cfg(debug_assertions)]
335                    eprintln!(
336                        "⚠ Fallback path (DataFrame): unaligned data ({} bytes)",
337                        data_bytes.len()
338                    );
339
340                    data_bytes
341                        .par_chunks_exact(4)
342                        .map(|chunk| {
343                            let mut bytes = [0u8; 4];
344                            bytes.copy_from_slice(chunk);
345                            let bits = u32::from_ne_bytes(bytes);
346                            let bits = if needs_swap { bits.swap_bytes() } else { bits };
347                            f32::from_bits(bits)
348                        })
349                        .collect()
350                }
351            }
352        };
353
354        // Validate f32 count
355        let expected_f32_values = number_of_events * number_of_parameters;
356        if f32_values.len() != expected_f32_values {
357            return Err(anyhow!(
358                "F32 values count mismatch: expected {} but got {}",
359                expected_f32_values,
360                f32_values.len()
361            ));
362        }
363
364        // Create Polars Series for each parameter (column)
365        // FCS data is stored row-wise (event1_param1, event1_param2, ..., event2_param1, ...)
366        // We need to extract columns using stride access
367        let mut columns: Vec<Column> = Vec::with_capacity(*number_of_parameters);
368
369        for param_idx in 0..*number_of_parameters {
370            // Extract this parameter's values across all events
371            // Use iterator with step_by for efficient stride access
372            let param_values: Vec<f32> = f32_values
373                .iter()
374                .skip(param_idx)
375                .step_by(*number_of_parameters)
376                .copied()
377                .collect();
378
379            // Verify we got the right number of events
380            assert_eq!(
381                param_values.len(),
382                *number_of_events,
383                "Parameter {} should have {} events, got {}",
384                param_idx + 1,
385                number_of_events,
386                param_values.len()
387            );
388
389            // Get parameter name from metadata for column name
390            let param_name = metadata
391                .get_parameter_channel_name(param_idx + 1)
392                .map(|s| s.to_string())
393                .unwrap_or_else(|_| format!("P{}", param_idx + 1));
394
395            // Create Series (Polars column) with name
396            let series = Column::new(param_name.as_str().into(), param_values);
397            columns.push(series);
398        }
399
400        // Create DataFrame from columns
401        let df = DataFrame::new(columns).map_err(|e| {
402            anyhow!(
403                "Failed to create DataFrame from {} columns: {}",
404                number_of_parameters,
405                e
406            )
407        })?;
408
409        // Verify DataFrame shape
410        assert_eq!(
411            df.height(),
412            *number_of_events,
413            "DataFrame height {} doesn't match expected events {}",
414            df.height(),
415            number_of_events
416        );
417        assert_eq!(
418            df.width(),
419            *number_of_parameters,
420            "DataFrame width {} doesn't match expected parameters {}",
421            df.width(),
422            number_of_parameters
423        );
424
425        #[cfg(debug_assertions)]
426        eprintln!(
427            "✓ Created DataFrame: {} events × {} parameters",
428            df.height(),
429            df.width()
430        );
431
432        Ok(Arc::new(df))
433    }
434
435    /// Looks for the parameter name as a key in the `parameters` hashmap and returns a reference to it
436    /// Performs case-insensitive lookup for parameter names
437    /// # Errors
438    /// Will return `Err` if the parameter name is not found in the `parameters` hashmap
439    pub fn find_parameter(&self, parameter_name: &str) -> Result<&Parameter> {
440        // Try exact match first (fast path)
441        if let Some(param) = self.parameters.get(parameter_name) {
442            return Ok(param);
443        }
444
445        // Case-insensitive fallback: search through parameter map
446        for (key, param) in self.parameters.iter() {
447            if key.eq_ignore_ascii_case(parameter_name) {
448                return Ok(param);
449            }
450        }
451
452        Err(anyhow!("Parameter not found: {parameter_name}"))
453    }
454
455    /// Looks for the parameter name as a key in the `parameters` hashmap and returns a mutable reference to it
456    /// Performs case-insensitive lookup for parameter names
457    /// # Errors
458    /// Will return `Err` if the parameter name is not found in the `parameters` hashmap
459    pub fn find_mutable_parameter(&mut self, parameter_name: &str) -> Result<&mut Parameter> {
460        // Try exact match first (fast path)
461        // Note: We need to check if the key exists as Arc<str>, so we iterate to find exact match
462        let exact_key = self
463            .parameters
464            .keys()
465            .find(|k| k.as_ref() == parameter_name)
466            .map(|k| k.clone());
467
468        if let Some(key) = exact_key {
469            return self
470                .parameters
471                .get_mut(&key)
472                .ok_or_else(|| anyhow!("Parameter not found: {parameter_name}"));
473        }
474
475        // Case-insensitive fallback: find the key first (clone Arc to avoid borrow issues)
476        let matching_key = self
477            .parameters
478            .keys()
479            .find(|key| key.eq_ignore_ascii_case(parameter_name))
480            .map(|k| k.clone());
481
482        if let Some(key) = matching_key {
483            return self
484                .parameters
485                .get_mut(&key)
486                .ok_or_else(|| anyhow!("Parameter not found: {parameter_name}"));
487        }
488
489        Err(anyhow!("Parameter not found: {parameter_name}"))
490    }
491
492    /// Returns a zero-copy reference to a Polars Float32Chunked view of a column for the parameter
493    ///
494    /// This provides access to the underlying Polars chunked array, which is useful
495    /// for operations that work directly with Polars types. For most use cases,
496    /// `get_parameter_events_slice()` is preferred as it provides a simple `&[f32]` slice.
497    ///
498    /// # Arguments
499    /// * `channel_name` - The channel name (e.g., "FSC-A", "FL1-A")
500    ///
501    /// # Errors
502    /// Will return `Err` if:
503    /// - the parameter name is not found in the parameters map
504    /// - the column data type is not Float32
505    pub fn get_parameter_events(&'_ self, channel_name: &str) -> Result<&Float32Chunked> {
506        Ok(self
507            .get_parameter_column(channel_name)?
508            .f32()
509            .map_err(|e| anyhow!("Parameter {} is not f32 type: {}", channel_name, e))?)
510    }
511    /// Get a reference to the Polars Column for a parameter by channel name
512    ///
513    /// This provides direct access to the underlying Polars column, which can be useful
514    /// for advanced operations that require the full Polars API.
515    ///
516    /// # Arguments
517    /// * `channel_name` - The channel name (e.g., "FSC-A", "FL1-A")
518    ///
519    /// # Errors
520    /// Will return `Err` if the parameter name is not found in the DataFrame
521    pub fn get_parameter_column(&'_ self, channel_name: &str) -> Result<&Column> {
522        self.data_frame
523            .column(channel_name)
524            .map_err(|e| anyhow!("Parameter {} not found: {}", channel_name, e))
525    }
526
527    /// Looks for the parameter name as a key in the 'parameters' hashmap and returns a new Vec<f32> of the raw event data
528    /// NOTE: This allocates a full copy of the events - prefer `get_parameter_events_slice` when possible
529    /// # Errors
530    /// Will return 'Err' if the parameter name is not found in the 'parameters hashmap or if the events are not found
531    pub fn get_parameter_events_as_owned_vec(&self, channel_name: &str) -> Result<Vec<EventDatum>> {
532        Ok(self.get_parameter_events_slice(channel_name)?.to_vec())
533    }
534
535    /// Returns the minimum and maximum values of the parameter
536    /// # Errors
537    /// Will return `Err` if the parameter name is not found in the 'parameters' hashmap or if the events are not found
538    pub fn get_minmax_of_parameter(&self, channel_name: &str) -> Result<(EventDatum, EventDatum)> {
539        let parameter = self.find_parameter(channel_name)?;
540        let events = self.get_parameter_events(&parameter.channel_name)?;
541
542        match events.iter().minmax() {
543            MinMaxResult::NoElements => Err(anyhow!("No elements found")),
544            MinMaxResult::OneElement(e) => Err(anyhow!("Only one element found: {:?}", e)),
545            MinMaxResult::MinMax(min, max) => Ok((min.unwrap(), max.unwrap())),
546        }
547    }
548
549    /// Creates a new `HashMap` of `Parameter`s
550    /// using the `Fcs` file's metadata to find the channel and label names from the `PnN` and `PnS` keywords.
551    /// Does NOT store events on the parameter.
552    /// # Errors
553    /// Will return `Err` if:
554    /// - the number of parameters cannot be found in the metadata,
555    /// - the parameter name cannot be found in the metadata,
556    /// - the parameter cannot be built (using the Builder pattern)
557    pub fn generate_parameter_map(metadata: &Metadata) -> Result<ParameterMap> {
558        let mut map = ParameterMap::default();
559        let number_of_parameters = metadata.get_number_of_parameters()?;
560        for parameter_number in 1..=*number_of_parameters {
561            let channel_name = metadata.get_parameter_channel_name(parameter_number)?;
562
563            // Use label name or fallback to the parameter name
564            let label_name = match metadata.get_parameter_label(parameter_number) {
565                Ok(label) => label,
566                Err(_) => channel_name,
567            };
568
569            let transform = if channel_name.contains("FSC")
570                || channel_name.contains("SSC")
571                || channel_name.contains("Time")
572            {
573                TransformType::Linear
574            } else {
575                TransformType::default()
576            };
577
578            // Get excitation wavelength from metadata if available
579            let excitation_wavelength = metadata
580                .get_parameter_excitation_wavelength(parameter_number)
581                .ok()
582                .flatten();
583
584            let parameter = ParameterBuilder::default()
585                // For the ParameterBuilder, ensure we're using the proper methods
586                // that may be defined by the Builder derive macro
587                .parameter_number(parameter_number)
588                .channel_name(channel_name)
589                .label_name(label_name)
590                .transform(transform)
591                .excitation_wavelength(excitation_wavelength)
592                .build()?;
593
594            // Add the parameter events to the hashmap keyed by the parameter name
595            map.insert(channel_name.to_string().into(), parameter);
596        }
597
598        Ok(map)
599    }
600
601    /// Looks for a keyword among the metadata and returns its value as a `&str`
602    /// # Errors
603    /// Will return `Err` if the `Keyword` is not found in the `metadata` or if the `Keyword` cannot be converted to a `&str`
604    pub fn get_keyword_string_value(&self, keyword: &str) -> Result<Cow<'_, str>> {
605        // TODO: This should be a match statement
606        if let Ok(keyword) = self.metadata.get_string_keyword(keyword) {
607            Ok(keyword.get_str())
608        } else if let Ok(keyword) = self.metadata.get_integer_keyword(keyword) {
609            Ok(keyword.get_str())
610        } else if let Ok(keyword) = self.metadata.get_float_keyword(keyword) {
611            Ok(keyword.get_str())
612        } else if let Ok(keyword) = self.metadata.get_byte_keyword(keyword) {
613            Ok(keyword.get_str())
614        } else if let Ok(keyword) = self.metadata.get_mixed_keyword(keyword) {
615            Ok(keyword.get_str())
616        } else {
617            Err(anyhow!("Keyword not found: {}", keyword))
618        }
619    }
620    /// A convenience function to return the `GUID` keyword from the `metadata` as a `&str`
621    /// # Errors
622    /// Will return `Err` if the `GUID` keyword is not found in the `metadata` or if the `GUID` keyword cannot be converted to a `&str`
623    pub fn get_guid(&self) -> Result<Cow<'_, str>> {
624        Ok(self.metadata.get_string_keyword("GUID")?.get_str())
625    }
626
627    /// Set or update the GUID keyword in the file's metadata
628    pub fn set_guid(&mut self, guid: String) {
629        self.metadata
630            .insert_string_keyword("GUID".to_string(), guid);
631    }
632
633    /// A convenience function to return the `$FIL` keyword from the `metadata` as a `&str`
634    /// # Errors
635    /// Will return `Err` if the `$FIL` keyword is not found in the `metadata` or if the `$FIL` keyword cannot be converted to a `&str`
636    pub fn get_fil_keyword(&self) -> Result<Cow<'_, str>> {
637        Ok(self.metadata.get_string_keyword("$FIL")?.get_str())
638    }
639
640    /// A convenience function to return the `$TOT` keyword from the `metadata` as a `usize`
641    /// # Errors
642    /// Will return `Err` if the `$TOT` keyword is not found in the `metadata` or if the `$TOT` keyword cannot be converted to a `usize`
643    pub fn get_number_of_events(&self) -> Result<&usize> {
644        self.metadata.get_number_of_events()
645    }
646
647    /// A convenience function to return the `$PAR` keyword from the `metadata` as a `usize`
648    /// # Errors
649    /// Will return `Err` if the `$PAR` keyword is not found in the `metadata` or if the `$PAR` keyword cannot be converted to a `usize`
650    pub fn get_number_of_parameters(&self) -> Result<&usize> {
651        self.metadata.get_number_of_parameters()
652    }
653
654    // ==================== NEW POLARS-BASED ACCESSOR METHODS ====================
655
656    /// Get events for a parameter as a slice of f32 values
657    /// Polars gives us direct access to the underlying buffer (zero-copy)
658    /// # Errors
659    /// Will return `Err` if:
660    /// - the parameter name is not found
661    /// - the Series data type is not Float32
662    /// - the data is chunked (rare for FCS files)
663    pub fn get_parameter_events_slice(&self, channel_name: &str) -> Result<&[f32]> {
664        self.get_parameter_events(channel_name)?
665            .cont_slice()
666            .map_err(|e| anyhow!("Parameter {} data is not contiguous: {}", channel_name, e))
667    }
668
669    /// Get two parameters as (x, y) pairs for plotting
670    /// Optimized for scatter plot use case with zero allocations until the collect
671    /// # Errors
672    /// Will return `Err` if either parameter name is not found
673    pub fn get_xy_pairs(&self, x_param: &str, y_param: &str) -> Result<Vec<(f32, f32)>> {
674        let x_data = self.get_parameter_events_slice(x_param)?;
675        let y_data = self.get_parameter_events_slice(y_param)?;
676
677        // Verify both parameters have the same length
678        if x_data.len() != y_data.len() {
679            return Err(anyhow!(
680                "Parameter length mismatch: {} has {} events, {} has {} events",
681                x_param,
682                x_data.len(),
683                y_param,
684                y_data.len()
685            ));
686        }
687
688        // Zip is zero-cost abstraction - uses iterators efficiently
689        Ok(x_data
690            .iter()
691            .zip(y_data.iter())
692            .map(|(&x, &y)| (x, y))
693            .collect())
694    }
695
696    /// Get DataFrame height (number of events)
697    #[must_use]
698    pub fn get_event_count_from_dataframe(&self) -> usize {
699        self.data_frame.height()
700    }
701
702    /// Get DataFrame width (number of parameters)
703    #[must_use]
704    pub fn get_parameter_count_from_dataframe(&self) -> usize {
705        self.data_frame.width()
706    }
707
708    /// Get DataFrame column names (parameter names)
709    pub fn get_parameter_names_from_dataframe(&self) -> Vec<String> {
710        self.data_frame
711            .get_column_names()
712            .into_iter()
713            .map(|s| s.to_string())
714            .collect()
715    }
716
717    /// Aggregate statistics for a parameter using Polars' streaming API for low memory usage and minimal, chunked passes.
718    ///
719    /// When streaming is enabled, Polars creates a *Pipeline*:
720    ///
721    /// **Source**: It pulls a chunk of data from the disk (e.g., 50,000 rows).
722    ///
723    /// **Operators**: It passes that chunk through your expressions (calculating the running sum, count, min, and max for that specific chunk).
724    ///
725    /// **Sink**: It aggregates the results from all chunks into a final result.
726    ///
727    /// Because the statistics we are calculating (min, max, mean) are *associative* and *commutative*, Polars can calculate them partially on each chunk and then combine them at the very end.
728    ///
729    /// Returns (min, max, mean, std_dev)
730    /// # Errors
731    /// Will return `Err` if the parameter is not found or stats calculation fails
732    pub fn get_parameter_statistics(&self, channel_name: &str) -> Result<(f32, f32, f32, f32)> {
733        let stats = (*self.data_frame)
734            .clone()
735            .lazy()
736            .select([
737                col(channel_name).min().alias("min"),
738                col(channel_name).max().alias("max"),
739                col(channel_name).mean().alias("mean"),
740                col(channel_name).std(1).alias("std"),
741            ])
742            .collect_with_engine(Engine::Streaming)?;
743        let min = stats
744            .column("min")
745            .unwrap()
746            .f32()?
747            .get(0)
748            .ok_or(anyhow!("No min found"))?;
749        let max = stats
750            .column("max")
751            .unwrap()
752            .f32()?
753            .get(0)
754            .ok_or(anyhow!("No max found"))?;
755        let mean = stats
756            .column("mean")
757            .unwrap()
758            .f32()?
759            .get(0)
760            .ok_or(anyhow!("No mean found"))?;
761        let std = stats
762            .column("std")
763            .unwrap()
764            .f32()?
765            .get(0)
766            .ok_or(anyhow!("No std deviation found"))?;
767
768        Ok((min, max, mean, std))
769    }
770
771    // ==================== TRANSFORMATION METHODS ====================
772
773    /// Apply arcsinh transformation to a parameter using Polars
774    /// This is the most common transformation for flow cytometry data
775    /// Formula: arcsinh(x / cofactor) / ln(10)
776    ///
777    /// # Arguments
778    /// * `parameter_name` - Name of the parameter to transform
779    /// * `cofactor` - Scaling factor (typical: 150-200 for modern instruments)
780    ///
781    /// # Returns
782    /// New DataFrame with the transformed parameter
783    pub fn apply_arcsinh_transform(
784        &self,
785        parameter_name: &str,
786        cofactor: f32,
787    ) -> Result<EventDataFrame> {
788        let df = (*self.data_frame).clone();
789
790        // Get the column to transform
791        let col = df
792            .column(parameter_name)
793            .map_err(|e| anyhow!("Parameter {} not found: {}", parameter_name, e))?;
794
795        let series = col.as_materialized_series();
796        let ca = series
797            .f32()
798            .map_err(|e| anyhow!("Parameter {} is not f32: {}", parameter_name, e))?;
799
800        // Apply arcsinh transformation: arcsinh(x / cofactor) / ln(10)
801        use rayon::prelude::*;
802        let ln_10 = 10_f32.ln();
803        let transformed: Vec<f32> = ca
804            .cont_slice()
805            .map_err(|e| anyhow!("Data not contiguous: {}", e))?
806            .par_iter()
807            .map(|&x| ((x / cofactor).asinh()) / ln_10)
808            .collect();
809
810        // Create new column with transformed data
811        let new_series = Series::new(parameter_name.into(), transformed);
812
813        // Replace the column in DataFrame
814        let mut new_df = df;
815        new_df
816            .replace(parameter_name, new_series)
817            .map_err(|e| anyhow!("Failed to replace column: {}", e))?;
818
819        Ok(Arc::new(new_df))
820    }
821
822    /// Apply arcsinh transformation to multiple parameters
823    ///
824    /// # Arguments
825    /// * `parameters` - List of (parameter_name, cofactor) pairs
826    ///
827    /// # Returns
828    /// New DataFrame with all specified parameters transformed
829    pub fn apply_arcsinh_transforms(&self, parameters: &[(&str, f32)]) -> Result<EventDataFrame> {
830        let mut df = (*self.data_frame).clone();
831
832        let ln_10 = 10_f32.ln();
833        use rayon::prelude::*;
834
835        for &(param_name, cofactor) in parameters {
836            let col = df
837                .column(param_name)
838                .map_err(|e| anyhow!("Parameter {} not found: {}", param_name, e))?;
839
840            let series = col.as_materialized_series();
841            let ca = series
842                .f32()
843                .map_err(|e| anyhow!("Parameter {} is not f32: {}", param_name, e))?;
844
845            let transformed: Vec<f32> = ca
846                .cont_slice()
847                .map_err(|e| anyhow!("Data not contiguous: {}", e))?
848                .par_iter()
849                .map(|&x| ((x / cofactor).asinh()) / ln_10)
850                .collect();
851
852            let new_series = Series::new(param_name.into(), transformed);
853            df.replace(param_name, new_series)
854                .map_err(|e| anyhow!("Failed to replace column {}: {}", param_name, e))?;
855        }
856
857        Ok(Arc::new(df))
858    }
859
860    /// Apply default arcsinh transformation to all fluorescence parameters
861    /// Automatically detects fluorescence parameters (excludes FSC, SSC, Time)
862    /// Uses cofactor = 200 (good default for modern instruments)
863    pub fn apply_default_arcsinh_transform(&self) -> Result<EventDataFrame> {
864        let param_names = self.get_parameter_names_from_dataframe();
865
866        // Filter to fluorescence parameters (exclude scatter and time)
867        let fluor_params: Vec<(&str, f32)> = param_names
868            .iter()
869            .filter(|name| {
870                let upper = name.to_uppercase();
871                !upper.contains("FSC") && !upper.contains("SSC") && !upper.contains("TIME")
872            })
873            .map(|name| (name.as_str(), 200.0)) // Default cofactor = 200
874            .collect();
875
876        self.apply_arcsinh_transforms(&fluor_params)
877    }
878
879    // ==================== COMPENSATION METHODS ====================
880
881    /// Extract compensation matrix from $SPILLOVER keyword
882    /// Returns (matrix, channel_names) if spillover keyword exists
883    /// Returns None if no spillover keyword is present in the file
884    ///
885    /// # Returns
886    /// Some((compensation_matrix, channel_names)) if spillover exists, None otherwise
887    ///
888    /// # Errors
889    /// Will return `Err` if spillover keyword is malformed
890    pub fn get_spillover_matrix(&self) -> Result<Option<(Array2<f32>, Vec<String>)>> {
891        use crate::keyword::{Keyword, MixedKeyword};
892
893        // Try to get the $SPILLOVER keyword
894        let spillover_keyword = match self.metadata.keywords.get("$SPILLOVER") {
895            Some(Keyword::Mixed(MixedKeyword::SPILLOVER {
896                n_parameters,
897                parameter_names,
898                matrix_values,
899            })) => (
900                *n_parameters,
901                parameter_names.clone(),
902                matrix_values.clone(),
903            ),
904            Some(_) => {
905                return Err(anyhow!("$SPILLOVER keyword exists but has wrong type"));
906            }
907            None => {
908                // No spillover keyword - this is fine, not all files have it
909                return Ok(None);
910            }
911        };
912
913        let (n_params, param_names, matrix_values): (usize, Vec<String>, Vec<f32>) =
914            spillover_keyword;
915
916        // Validate matrix dimensions
917        let expected_matrix_size = n_params * n_params;
918        if matrix_values.len() != expected_matrix_size {
919            return Err(anyhow!(
920                "SPILLOVER matrix size mismatch: expected {} values for {}x{} matrix, got {}",
921                expected_matrix_size,
922                n_params,
923                n_params,
924                matrix_values.len()
925            ));
926        }
927
928        // Create Array2 from matrix values
929        // FCS spillover is stored row-major order
930        let matrix = Array2::from_shape_vec((n_params, n_params), matrix_values)
931            .map_err(|e| anyhow!("Failed to create compensation matrix from SPILLOVER: {}", e))?;
932
933        Ok(Some((matrix, param_names)))
934    }
935
936    /// Check if this file has compensation information
937    #[must_use]
938    pub fn has_compensation(&self) -> bool {
939        self.get_spillover_matrix()
940            .map(|opt| opt.is_some())
941            .unwrap_or(false)
942    }
943
944    /// Apply compensation from the file's $SPILLOVER keyword
945    /// Convenience method that extracts spillover and applies it automatically
946    ///
947    /// # Returns
948    /// New DataFrame with compensated data, or error if no spillover keyword exists
949    pub fn apply_file_compensation(&self) -> Result<EventDataFrame> {
950        let (comp_matrix, channel_names) = self
951            .get_spillover_matrix()?
952            .ok_or_else(|| anyhow!("No $SPILLOVER keyword found in FCS file"))?;
953
954        let channel_refs: Vec<&str> = channel_names.iter().map(|s| s.as_str()).collect();
955
956        self.apply_compensation(&comp_matrix, &channel_refs)
957    }
958
959    /// OPTIMIZED: Get compensated data for specific parameters only (lazy/partial compensation)
960    ///
961    /// This is 15-30x faster than apply_file_compensation when you only need a few parameters
962    /// because it:
963    /// - Only compensates the requested channels (e.g., 2 vs 30)
964    /// - Uses sparse matrix optimization for matrices with >80% zeros
965    /// - Bypasses compensation entirely for identity matrices
966    ///
967    /// # Arguments
968    /// * `channels_needed` - Only the channel names you need compensated (typically 2 for a plot)
969    ///
970    /// # Returns
971    /// HashMap of channel_name -> compensated data (as Vec<f32>)
972    ///
973    /// # Performance
974    /// - Dense matrix (2/30 channels): **15x faster** (150ms → 10ms)
975    /// - Sparse matrix (90% sparse): **50x faster** (150ms → 3ms)
976    /// - Identity matrix: **300x faster** (150ms → 0.5ms)
977    pub fn get_compensated_parameters(
978        &self,
979        channels_needed: &[&str],
980    ) -> Result<std::collections::HashMap<String, Vec<f32>>> {
981        use std::collections::HashMap;
982
983        // Get spillover matrix
984        let (comp_matrix, matrix_channel_names) = self
985            .get_spillover_matrix()?
986            .ok_or_else(|| anyhow!("No $SPILLOVER keyword found in FCS file"))?;
987
988        let n_events = self.get_event_count_from_dataframe();
989
990        // OPTIMIZATION 1: Check if matrix is identity (no compensation needed)
991        let is_identity = {
992            let mut is_id = true;
993            for i in 0..comp_matrix.nrows() {
994                for j in 0..comp_matrix.ncols() {
995                    let expected = if i == j { 1.0 } else { 0.0 };
996                    if (comp_matrix[[i, j]] - expected).abs() > 1e-6 {
997                        is_id = false;
998                        break;
999                    }
1000                }
1001                if !is_id {
1002                    break;
1003                }
1004            }
1005            is_id
1006        };
1007
1008        if is_identity {
1009            eprintln!("🚀 Identity matrix detected - bypassing compensation");
1010            // Just return original data
1011            let mut result = HashMap::new();
1012            for &channel in channels_needed {
1013                let data = self.get_parameter_events_slice(channel)?;
1014                result.insert(channel.to_string(), data.to_vec());
1015            }
1016            return Ok(result);
1017        }
1018
1019        // OPTIMIZATION 2: Analyze sparsity
1020        let total_elements = comp_matrix.len();
1021        let non_zero_count = comp_matrix.iter().filter(|&&x| x.abs() > 1e-6).count();
1022        let sparsity = 1.0 - (non_zero_count as f64 / total_elements as f64);
1023        let is_sparse = sparsity > 0.8;
1024
1025        eprintln!(
1026            "📊 Compensation matrix: {:.1}% sparse, {} non-zero coefficients",
1027            sparsity * 100.0,
1028            non_zero_count
1029        );
1030
1031        // Find indices of channels we need
1032        let channel_indices: HashMap<&str, usize> = matrix_channel_names
1033            .iter()
1034            .enumerate()
1035            .map(|(i, name)| (name.as_str(), i))
1036            .collect();
1037
1038        let needed_indices: Vec<(String, usize)> = channels_needed
1039            .iter()
1040            .filter_map(|&ch| channel_indices.get(ch).map(|&idx| (ch.to_string(), idx)))
1041            .collect();
1042
1043        if needed_indices.is_empty() {
1044            return Err(anyhow!(
1045                "None of the requested channels found in compensation matrix"
1046            ));
1047        }
1048
1049        // Extract ONLY the channels involved in compensating our needed channels
1050        // For each needed channel, we need all channels that have non-zero spillover
1051        let mut involved_indices = std::collections::HashSet::new();
1052        for &(_, row_idx) in &needed_indices {
1053            // Add the channel itself
1054            involved_indices.insert(row_idx);
1055
1056            // Add channels with non-zero spillover
1057            if is_sparse {
1058                for col_idx in 0..comp_matrix.ncols() {
1059                    if comp_matrix[[row_idx, col_idx]].abs() > 1e-6 {
1060                        involved_indices.insert(col_idx);
1061                    }
1062                }
1063            } else {
1064                // For dense matrix, we need all channels
1065                for i in 0..comp_matrix.ncols() {
1066                    involved_indices.insert(i);
1067                }
1068            }
1069        }
1070
1071        let mut involved_vec: Vec<usize> = involved_indices.into_iter().collect();
1072        involved_vec.sort_unstable();
1073
1074        eprintln!(
1075            "🎯 Lazy compensation: loading {} channels (vs {} total)",
1076            involved_vec.len(),
1077            matrix_channel_names.len()
1078        );
1079
1080        // Extract data for involved channels only
1081        let mut channel_data: Vec<Vec<f32>> = Vec::with_capacity(involved_vec.len());
1082        for &idx in &involved_vec {
1083            let channel_name = &matrix_channel_names[idx];
1084            let data = self.get_parameter_events_slice(channel_name)?;
1085            channel_data.push(data.to_vec());
1086        }
1087
1088        // Extract sub-matrix for involved channels
1089        let sub_matrix = {
1090            let mut sub = Array2::<f32>::zeros((involved_vec.len(), involved_vec.len()));
1091            for (i, &orig_i) in involved_vec.iter().enumerate() {
1092                for (j, &orig_j) in involved_vec.iter().enumerate() {
1093                    sub[[i, j]] = comp_matrix[[orig_i, orig_j]];
1094                }
1095            }
1096            sub
1097        };
1098
1099        // Invert sub-matrix
1100        use ndarray_linalg::Inverse;
1101        let comp_inv = sub_matrix
1102            .inv()
1103            .map_err(|e| anyhow!("Failed to invert compensation matrix: {:?}", e))?;
1104
1105        // Compensate ONLY the involved channels
1106        use rayon::prelude::*;
1107        let compensated_data: Vec<Vec<f32>> = (0..involved_vec.len())
1108            .into_par_iter()
1109            .map(|i| {
1110                let row = comp_inv.row(i);
1111                let mut result = vec![0.0; n_events];
1112
1113                for event_idx in 0..n_events {
1114                    let mut sum = 0.0;
1115                    for (j, &coeff) in row.iter().enumerate() {
1116                        sum += coeff * channel_data[j][event_idx];
1117                    }
1118                    result[event_idx] = sum;
1119                }
1120
1121                result
1122            })
1123            .collect();
1124
1125        // Build result HashMap for only the channels we need
1126        let mut result = HashMap::new();
1127        for (channel_name, orig_idx) in needed_indices {
1128            if let Some(local_idx) = involved_vec.iter().position(|&x| x == orig_idx) {
1129                result.insert(channel_name, compensated_data[local_idx].clone());
1130            }
1131        }
1132
1133        eprintln!("🚀 Lazy compensation completed");
1134        Ok(result)
1135    }
1136
1137    /// Apply compensation matrix to the data using Polars
1138    /// Compensation corrects for spectral overlap between fluorescence channels
1139    ///
1140    /// # Arguments
1141    /// * `compensation_matrix` - 2D matrix where element [i,j] represents spillover from channel j into channel i
1142    /// * `channel_names` - Names of channels in the order they appear in the matrix
1143    ///
1144    /// # Returns
1145    /// New DataFrame with compensated fluorescence values
1146    ///
1147    /// # Example
1148    /// ```ignore
1149    /// // Create a 3x3 compensation matrix
1150    /// let comp_matrix = Array2::from_shape_vec((3, 3), vec![
1151    ///     1.0, 0.1, 0.05,  // FL1-A compensation
1152    ///     0.2, 1.0, 0.1,   // FL2-A compensation
1153    ///     0.1, 0.15, 1.0,  // FL3-A compensation
1154    /// ]).unwrap();
1155    /// let channels = vec!["FL1-A", "FL2-A", "FL3-A"];
1156    /// let compensated = fcs.apply_compensation(&comp_matrix, &channels)?;
1157    /// ```
1158    pub fn apply_compensation(
1159        &self,
1160        compensation_matrix: &Array2<f32>,
1161        channel_names: &[&str],
1162    ) -> Result<EventDataFrame> {
1163        // Verify matrix dimensions match channel names
1164        let n_channels = channel_names.len();
1165        if compensation_matrix.nrows() != n_channels || compensation_matrix.ncols() != n_channels {
1166            return Err(anyhow!(
1167                "Compensation matrix dimensions ({}, {}) don't match number of channels ({})",
1168                compensation_matrix.nrows(),
1169                compensation_matrix.ncols(),
1170                n_channels
1171            ));
1172        }
1173
1174        // Extract data for channels to compensate
1175        let mut channel_data: Vec<Vec<f32>> = Vec::with_capacity(n_channels);
1176        let n_events = self.get_event_count_from_dataframe();
1177
1178        for &channel_name in channel_names {
1179            let data = self.get_parameter_events_slice(channel_name)?;
1180            channel_data.push(data.to_vec());
1181        }
1182
1183        // Apply compensation: compensated = original * inverse(compensation_matrix)
1184        // For efficiency, we pre-compute the inverse
1185        let comp_inv = compensation_matrix
1186            .inv()
1187            .map_err(|e| anyhow!("Failed to invert compensation matrix: {:?}", e))?;
1188
1189        // Perform matrix multiplication for each event
1190        use rayon::prelude::*;
1191        let compensated_data: Vec<Vec<f32>> = (0..n_channels)
1192            .into_par_iter()
1193            .map(|i| {
1194                let row = comp_inv.row(i);
1195                let mut result = vec![0.0; n_events];
1196
1197                for event_idx in 0..n_events {
1198                    let mut sum = 0.0;
1199                    for (j, &coeff) in row.iter().enumerate() {
1200                        sum += coeff * channel_data[j][event_idx];
1201                    }
1202                    result[event_idx] = sum;
1203                }
1204
1205                result
1206            })
1207            .collect();
1208
1209        // Create new DataFrame with compensated values
1210        let mut df = (*self.data_frame).clone();
1211
1212        for (i, &channel_name) in channel_names.iter().enumerate() {
1213            let new_series = Series::new(channel_name.into(), compensated_data[i].clone());
1214            df.replace(channel_name, new_series)
1215                .map_err(|e| anyhow!("Failed to replace column {}: {}", channel_name, e))?;
1216        }
1217
1218        Ok(Arc::new(df))
1219    }
1220
1221    /// Apply spectral unmixing (similar to compensation but for spectral flow cytometry)
1222    /// Uses a good default cofactor of 200 for transformation before/after unmixing
1223    ///
1224    /// # Arguments
1225    /// * `unmixing_matrix` - Matrix describing spectral signatures of fluorophores
1226    /// * `channel_names` - Names of spectral channels
1227    /// * `cofactor` - Cofactor for arcsinh transformation (default: 200)
1228    ///
1229    /// # Returns
1230    /// New DataFrame with unmixed and transformed fluorescence values
1231    pub fn apply_spectral_unmixing(
1232        &self,
1233        unmixing_matrix: &Array2<f32>,
1234        channel_names: &[&str],
1235        cofactor: Option<f32>,
1236    ) -> Result<EventDataFrame> {
1237        let cofactor = cofactor.unwrap_or(200.0);
1238
1239        // First, inverse-transform the data (go back to linear scale)
1240        let ln_10 = 10_f32.ln();
1241        let mut df = (*self.data_frame).clone();
1242
1243        use rayon::prelude::*;
1244        for &channel_name in channel_names {
1245            let col = df
1246                .column(channel_name)
1247                .map_err(|e| anyhow!("Parameter {} not found: {}", channel_name, e))?;
1248
1249            let series = col.as_materialized_series();
1250            let ca = series
1251                .f32()
1252                .map_err(|e| anyhow!("Parameter {} is not f32: {}", channel_name, e))?;
1253
1254            // Inverse arcsinh: x = cofactor * sinh(y * ln(10))
1255            let linear: Vec<f32> = ca
1256                .cont_slice()
1257                .map_err(|e| anyhow!("Data not contiguous: {}", e))?
1258                .par_iter()
1259                .map(|&y| cofactor * (y * ln_10).sinh())
1260                .collect();
1261
1262            let new_series = Series::new(channel_name.into(), linear);
1263            df.replace(channel_name, new_series)
1264                .map_err(|e| anyhow!("Failed to replace column: {}", e))?;
1265        }
1266
1267        // Apply unmixing matrix (same as compensation)
1268        let df_with_linear = Arc::new(df);
1269        let fcs_temp = Fcs {
1270            data_frame: df_with_linear,
1271            ..self.clone()
1272        };
1273        let unmixed = fcs_temp.apply_compensation(unmixing_matrix, channel_names)?;
1274
1275        // Re-apply arcsinh transformation
1276        let fcs_unmixed = Fcs {
1277            data_frame: unmixed,
1278            ..self.clone()
1279        };
1280
1281        let params_with_cofactor: Vec<(&str, f32)> =
1282            channel_names.iter().map(|&name| (name, cofactor)).collect();
1283
1284        fcs_unmixed.apply_arcsinh_transforms(&params_with_cofactor)
1285    }
1286}