Skip to main content

flow_fcs/
file.rs

1// Internal crate imports
2use crate::{
3    FcsDataType, TransformType, Transformable,
4    byteorder::ByteOrder,
5    header::Header,
6    keyword::{IntegerableKeyword, StringableKeyword},
7    metadata::Metadata,
8    parameter::{EventDataFrame, EventDatum, Parameter, ParameterBuilder, ParameterMap},
9};
10// Standard library imports
11use std::borrow::Cow;
12use std::fs::File;
13use std::ops::Deref;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16
17// External crate imports
18use anyhow::{Context, Result, anyhow};
19use byteorder::{BigEndian as BE, ByteOrder as BO, LittleEndian as LE};
20use itertools::{Itertools, MinMaxResult};
21use memmap3::{Mmap, MmapOptions};
22use faer::Mat;
23use polars::prelude::*;
24use rayon::prelude::*;
25
26/// Threshold for parallel processing: only use parallel for datasets larger than this
27/// Below this threshold, parallel overhead exceeds benefits
28/// Based on benchmarks: 400,000 values (50,000 events × 8 parameters)
29/// - Float32: Always use sequential (benchmarks show sequential is 2-13x faster)
30/// - Int16/Int32/Float64: Use parallel for datasets with ≥400k values
31const PARALLEL_THRESHOLD: usize = 400_000;
32
33/// A shareable wrapper around the file path and memory-map
34///
35/// Uses Arc<Mmap> to share the memory mapping across clones without creating
36/// new file descriptors or memory mappings. This is more efficient than cloning
37/// the underlying file descriptor and re-mapping.
38#[derive(Debug, Clone)]
39pub struct AccessWrapper {
40    /// An owned, mutable path to the file on disk
41    pub path: PathBuf,
42    /// The memory-mapped file, shared via Arc for efficient cloning
43    ///
44    /// # Safety
45    /// The Mmap is created from a File handle and remains valid as long as:
46    /// 1. The file is not truncated while mapped
47    /// 2. The file contents are not modified while mapped (we only read)
48    /// 3. The Mmap is not accessed after the file is deleted
49    ///
50    /// Our usage satisfies these invariants because:
51    /// - FCS files are read-only once opened (we never write back to them)
52    /// - The file remains open (via File handle) for the lifetime of the Mmap
53    /// - We only drop the Mmap when the FCS file is no longer needed
54    pub mmap: Arc<Mmap>,
55}
56
57impl AccessWrapper {
58    /// Creates a new `AccessWrapper` from a file path
59    /// # Errors
60    /// Will return `Err` if:
61    /// - the file cannot be opened
62    /// - the file cannot be memory-mapped
63    pub fn new(path: &str) -> Result<Self> {
64        let file = File::open(path)?;
65        let path = PathBuf::from(path);
66
67        // memmap3 provides better safety guarantees than memmap2, though OS-level
68        // memory mapping still requires unsafe at creation time. The resulting Mmap
69        // is safe to use and provides better guarantees than memmap2.
70        let mmap = unsafe { MmapOptions::new().map(&file)? };
71
72        Ok(Self {
73            path,
74            mmap: Arc::new(mmap),
75        })
76    }
77}
78
79impl Deref for AccessWrapper {
80    type Target = Mmap;
81
82    fn deref(&self) -> &Self::Target {
83        &self.mmap
84    }
85}
86
87/// A struct representing an FCS file
88#[derive(Debug, Clone)]
89pub struct Fcs {
90    /// The header segment of the fcs file, including the version, and byte offsets to the text, data, and analysis segments
91    pub header: Header,
92    /// The metadata segment of the fcs file, including the delimiter, and a hashmap of keyword/value pairs
93    pub metadata: Metadata,
94    /// A hashmap of the parameter names and their associated metadata
95    pub parameters: ParameterMap,
96
97    /// Event data stored in columnar format via Polars DataFrame (NEW)
98    /// Each column represents one parameter (e.g., FSC-A, SSC-A, FL1-A)
99    /// Polars provides:
100    /// - Zero-copy column access
101    /// - Built-in SIMD operations
102    /// - Lazy evaluation for complex queries
103    /// - Apache Arrow interop
104    /// This is the primary data format going forward
105    pub data_frame: EventDataFrame,
106
107    /// A wrapper around the file, path, and memory-map
108    pub file_access: AccessWrapper,
109}
110
111impl Fcs {
112    /// Creates a new Fcs file struct
113    /// # Errors
114    /// Will return `Err` if:
115    /// - the file cannot be opened,
116    /// - the file extension is not `fcs`,
117    /// - the TEXT segment cannot be validated,
118    /// - the raw data cannot be read,
119    /// - the parameter names and labels cannot be generated
120    pub fn new() -> Result<Self> {
121        Ok(Self {
122            header: Header::new(),
123            metadata: Metadata::new(),
124            parameters: ParameterMap::default(),
125            data_frame: Arc::new(DataFrame::empty()),
126            file_access: AccessWrapper::new("")?,
127        })
128    }
129
130    /// Opens and parses an FCS file from the given path
131    ///
132    /// This is the primary entry point for reading FCS files. It:
133    /// - Validates the file extension (must be `.fcs`)
134    /// - Memory-maps the file for efficient access
135    /// - Parses the header segment to determine FCS version and segment offsets
136    /// - Parses the text segment to extract metadata and keywords
137    /// - Validates required keywords for the FCS version
138    /// - Generates a GUID if one is not present
139    /// - Loads event data into a Polars DataFrame for efficient columnar access
140    ///
141    /// # Arguments
142    /// * `path` - Path to the FCS file (must have `.fcs` extension)
143    ///
144    /// # Errors
145    /// Will return `Err` if:
146    /// - the file cannot be opened or memory-mapped
147    /// - the file extension is not `.fcs`
148    /// - the FCS version is invalid or unsupported
149    /// - required keywords are missing for the FCS version
150    /// - the data segment cannot be read or parsed
151    /// - parameter metadata cannot be generated
152    ///
153    /// # Example
154    /// ```no_run
155    /// use flow_fcs::Fcs;
156    ///
157    /// let fcs = Fcs::open("data/sample.fcs")?;
158    /// println!("File has {} events", fcs.get_number_of_events()?);
159    /// # Ok::<(), anyhow::Error>(())
160    /// ```
161    pub fn open(path: &str) -> Result<Self> {
162        use tracing::debug;
163
164        // Attempt to open the file path
165        let file_access =
166            AccessWrapper::new(path).with_context(|| format!("Failed to open file: {}", path))?;
167
168        // Validate the file extension
169        Self::validate_fcs_extension(&file_access.path).with_context(|| {
170            format!("Invalid file extension for: {}", file_access.path.display())
171        })?;
172
173        // Create header and metadata structs from a memory map of the file
174        let header = Header::from_mmap(&file_access.mmap).with_context(|| {
175            format!(
176                "Failed to parse header from file: {}",
177                file_access.path.display()
178            )
179        })?;
180        let mut metadata = Metadata::from_mmap(&file_access.mmap, &header);
181
182        metadata
183            .validate_text_segment_keywords(&header)
184            .with_context(|| {
185                format!(
186                    "Failed to validate text segment keywords in file: {}",
187                    file_access.path.display()
188                )
189            })?;
190        // metadata.validate_number_of_parameters()?;
191        metadata.validate_guid();
192
193        // Log $TOT keyword value
194        let tot_events = metadata.get_number_of_events().ok().copied();
195        if let Some(tot) = tot_events {
196            debug!("FCS file $TOT keyword: {} events", tot);
197        }
198
199        let parameters = Self::generate_parameter_map(&metadata).map_err(|e| {
200            let diagnostic = Self::format_diagnostic_info(&header, &metadata, &file_access.path);
201            anyhow!("Failed to generate parameter map: {}\n\n{}", e, diagnostic)
202        })?;
203
204        let data_frame = Self::store_raw_data_as_dataframe(&header, &file_access.mmap, &metadata)
205            .map_err(|e| {
206            let diagnostic = Self::format_diagnostic_info(&header, &metadata, &file_access.path);
207            anyhow!(
208                "Failed to store raw data as DataFrame: {}\n\n{}",
209                e,
210                diagnostic
211            )
212        })?;
213
214        let fcs = Self {
215            parameters,
216            data_frame,
217            file_access,
218            header,
219            metadata,
220        };
221
222        // Log DataFrame event count and compare to $TOT
223        let df_events = fcs.get_event_count_from_dataframe();
224        if let Some(tot) = tot_events {
225            if df_events != tot {
226                tracing::warn!(
227                    "Event count mismatch: DataFrame has {} events but $TOT keyword says {} (difference: {})",
228                    df_events,
229                    tot,
230                    tot as i64 - df_events as i64
231                );
232            } else {
233                debug!("Event count matches $TOT keyword: {} events", df_events);
234            }
235        }
236
237        // Log compensation status
238        let has_compensation = fcs.has_compensation();
239        debug!(
240            "Compensation: {} (SPILLOVER keyword {})",
241            if has_compensation {
242                "available"
243            } else {
244                "not available"
245            },
246            if has_compensation {
247                "present"
248            } else {
249                "missing"
250            }
251        );
252
253        // Log parameter count
254        let n_params = fcs.parameters.len();
255        debug!(
256            "FCS file loaded: {} parameters, {} events",
257            n_params, df_events
258        );
259
260        Ok(fcs)
261    }
262
263    /// Validates that the file extension is `.fcs`
264    /// # Errors
265    /// Will return `Err` if the file extension is not `.fcs`
266    fn validate_fcs_extension(path: &Path) -> Result<()> {
267        let extension = path
268            .extension()
269            .ok_or_else(|| anyhow!("File has no extension"))?
270            .to_str()
271            .ok_or_else(|| anyhow!("File extension is not valid UTF-8"))?;
272
273        if extension.to_lowercase() != "fcs" {
274            return Err(anyhow!("Invalid file extension: {}", extension));
275        }
276
277        Ok(())
278    }
279
280    /// Reads raw data from FCS file and stores it as a Polars DataFrame
281    /// Returns columnar data optimized for parameter-wise access patterns
282    ///
283    /// This function provides significant performance benefits over row-based array storage:
284    /// - 2-5x faster data filtering and transformations
285    /// - Native columnar storage (optimal for FCS parameter access patterns)
286    /// - Zero-copy operations via Apache Arrow
287    /// - Built-in SIMD acceleration
288    ///
289    /// # Errors
290    /// Will return `Err` if:
291    /// - The data cannot be read
292    /// - The data cannot be converted to f32 values
293    /// - The DataFrame cannot be constructed
294    fn store_raw_data_as_dataframe(
295        header: &Header,
296        mmap: &Mmap,
297        metadata: &Metadata,
298    ) -> Result<EventDataFrame> {
299        // Validate data offset bounds before accessing mmap
300        let mut data_start = *header.data_offset.start();
301        let mut data_end = *header.data_offset.end();
302        let mmap_len = mmap.len();
303
304        // Handle zero offsets by checking keywords
305        if data_start == 0 {
306            if let Ok(begin_data) = metadata.get_integer_keyword("$BEGINDATA") {
307                data_start = begin_data.get_usize().clone();
308            } else {
309                return Err(anyhow!(
310                    "$BEGINDATA keyword not found. Unable to determine data start."
311                ));
312            }
313        }
314
315        if data_end == 0 {
316            if let Ok(end_data) = metadata.get_integer_keyword("$ENDDATA") {
317                data_end = end_data.get_usize().clone();
318            } else {
319                return Err(anyhow!(
320                    "$ENDDATA keyword not found. Unable to determine data end."
321                ));
322            }
323        }
324
325        // Validate offsets
326        if data_start >= mmap_len {
327            return Err(anyhow!(
328                "Data start offset {} is beyond mmap length {}",
329                data_start,
330                mmap_len
331            ));
332        }
333
334        if data_end >= mmap_len {
335            return Err(anyhow!(
336                "Data end offset {} is beyond mmap length {}",
337                data_end,
338                mmap_len
339            ));
340        }
341
342        if data_start > data_end {
343            return Err(anyhow!(
344                "Data start offset {} is greater than end offset {}",
345                data_start,
346                data_end
347            ));
348        }
349
350        // Extract data bytes
351        let data_bytes = &mmap[data_start..=data_end];
352
353        let number_of_parameters = metadata
354            .get_number_of_parameters()
355            .context("Failed to retrieve the number of parameters from metadata")?;
356        let number_of_events = metadata
357            .get_number_of_events()
358            .context("Failed to retrieve the number of events from metadata")?;
359
360        // Calculate bytes per event by summing $PnB / 8 for each parameter
361        // This is more accurate than using $DATATYPE which only provides a default
362        let bytes_per_event = metadata
363            .calculate_bytes_per_event()
364            .context("Failed to calculate bytes per event")?;
365
366        let byte_order = metadata
367            .get_byte_order()
368            .context("Failed to get the byte order from metadata")?;
369
370        // Validate data size
371        let expected_total_bytes = number_of_events * bytes_per_event;
372        if data_bytes.len() < expected_total_bytes {
373            return Err(anyhow!(
374                "Insufficient data: expected {} bytes ({} events × {} bytes/event), but only have {} bytes",
375                expected_total_bytes,
376                number_of_events,
377                bytes_per_event,
378                data_bytes.len()
379            ));
380        }
381
382        // Collect bytes per parameter and data types for each parameter
383        let bytes_per_parameter: Vec<usize> = (1..=*number_of_parameters)
384            .map(|param_num| {
385                metadata
386                    .get_bytes_per_parameter(param_num)
387                    .with_context(|| {
388                        format!(
389                            "Failed to get bytes per parameter for parameter {}",
390                            param_num
391                        )
392                    })
393            })
394            .collect::<Result<Vec<_>>>()?;
395
396        let data_types: Vec<FcsDataType> = (1..=*number_of_parameters)
397            .map(|param_num| {
398                metadata
399                    .get_data_type_for_channel(param_num)
400                    .with_context(|| format!("Failed to get data type for channel {}", param_num))
401            })
402            .collect::<Result<Vec<_>>>()?;
403
404        // Fast path: Check if all parameters are uniform (same bytes, same data type)
405        // This allows us to use bytemuck zero-copy optimization
406        let uniform_bytes = bytes_per_parameter.first().copied();
407        let uniform_data_type = data_types.first().copied();
408        let is_uniform = match (uniform_bytes, uniform_data_type) {
409            (Some(bytes), Some(dt)) => {
410                bytes_per_parameter.iter().all(|&b| b == bytes)
411                    && data_types.iter().all(|&dt_val| dt_val == dt)
412            }
413            _ => false,
414        };
415
416        let f32_values: Vec<f32> = if is_uniform {
417            // Fast path: All parameters have same size and type - use bytemuck for zero-copy
418            let bytes_per_param = uniform_bytes.ok_or_else(|| anyhow!("No uniform bytes found"))?;
419            let data_type =
420                uniform_data_type.ok_or_else(|| anyhow!("No uniform data type found"))?;
421
422            match (data_type, bytes_per_param) {
423                (FcsDataType::F, 4) => {
424                    // Fast path: float32 - use sequential (benchmarks show 2.57x faster than parallel)
425                    let needs_swap = match (byte_order, cfg!(target_endian = "little")) {
426                        (ByteOrder::LittleEndian, true) | (ByteOrder::BigEndian, false) => false,
427                        _ => true,
428                    };
429
430                    match bytemuck::try_cast_slice::<u8, f32>(data_bytes) {
431                        Ok(f32_slice) => {
432                            #[cfg(debug_assertions)]
433                            eprintln!(
434                                "✓ Fast path (bytemuck zero-copy, sequential): {} bytes, {} f32s",
435                                data_bytes.len(),
436                                f32_slice.len()
437                            );
438
439                            if needs_swap {
440                                // Sequential byte swap - faster than parallel for float32
441                                f32_slice
442                                    .iter()
443                                    .map(|&f| f32::from_bits(f.to_bits().swap_bytes()))
444                                    .collect()
445                            } else {
446                                f32_slice.to_vec()
447                            }
448                        }
449                        Err(_) => {
450                            #[cfg(debug_assertions)]
451                            eprintln!(
452                                "⚠ Fast path (bytemuck fallback, sequential): unaligned data ({} bytes)",
453                                data_bytes.len()
454                            );
455
456                            // Fallback: parse in chunks sequentially (faster than parallel for float32)
457                            data_bytes
458                                .chunks_exact(4)
459                                .map(|chunk| {
460                                    let mut bytes = [0u8; 4];
461                                    bytes.copy_from_slice(chunk);
462                                    let bits = u32::from_ne_bytes(bytes);
463                                    let bits = if needs_swap { bits.swap_bytes() } else { bits };
464                                    f32::from_bits(bits)
465                                })
466                                .collect()
467                        }
468                    }
469                }
470                _ => {
471                    // Uniform but not float32 - use optimized bulk parsing
472                    Self::parse_uniform_data_bulk(
473                        data_bytes,
474                        bytes_per_param,
475                        &data_type,
476                        byte_order,
477                        *number_of_events,
478                        *number_of_parameters,
479                    )?
480                }
481            }
482        } else {
483            // Slow path: Variable-width parameters - parse event-by-event
484            Self::parse_variable_width_data(
485                data_bytes,
486                &bytes_per_parameter,
487                &data_types,
488                byte_order,
489                *number_of_events,
490                *number_of_parameters,
491            )?
492        };
493
494        // Create Polars Series for each parameter (column)
495        // FCS data is stored row-wise (event1_param1, event1_param2, ..., event2_param1, ...)
496        // We need to extract columns using stride access
497        let mut columns: Vec<Column> = Vec::with_capacity(*number_of_parameters);
498
499        for param_idx in 0..*number_of_parameters {
500            // Extract this parameter's values across all events
501            // Use iterator with step_by for efficient stride access
502            let param_values: Vec<f32> = f32_values
503                .iter()
504                .skip(param_idx)
505                .step_by(*number_of_parameters)
506                .copied()
507                .collect();
508
509            // Verify we got the right number of events
510            assert_eq!(
511                param_values.len(),
512                *number_of_events,
513                "Parameter {} should have {} events, got {}",
514                param_idx + 1,
515                number_of_events,
516                param_values.len()
517            );
518
519            // Get parameter name from metadata for column name
520            let param_name = metadata
521                .get_parameter_channel_name(param_idx + 1)
522                .map(|s| s.to_string())
523                .unwrap_or_else(|_| format!("P{}", param_idx + 1));
524
525            // Create Series (Polars column) with name
526            let series = Column::new(param_name.as_str().into(), param_values);
527            columns.push(series);
528        }
529
530        // Create DataFrame from columns (height = number of events)
531        let df = DataFrame::new(*number_of_events, columns).map_err(|e| {
532            anyhow!(
533                "Failed to create DataFrame from {} columns: {}",
534                number_of_parameters,
535                e
536            )
537        })?;
538
539        // Verify DataFrame shape
540        assert_eq!(
541            df.height(),
542            *number_of_events,
543            "DataFrame height {} doesn't match expected events {}",
544            df.height(),
545            number_of_events
546        );
547        assert_eq!(
548            df.width(),
549            *number_of_parameters,
550            "DataFrame width {} doesn't match expected parameters {}",
551            df.width(),
552            number_of_parameters
553        );
554
555        #[cfg(debug_assertions)]
556        eprintln!(
557            "✓ Created DataFrame: {} events × {} parameters",
558            df.height(),
559            df.width()
560        );
561
562        Ok(Arc::new(df))
563    }
564
565    /// Parse uniform data in bulk (all parameters have same size and type)
566    ///
567    /// This is faster than event-by-event parsing when all parameters are uniform.
568    /// Uses conditional parallelization based on data type and size:
569    /// - float32: always sequential (benchmarks show 2.57x faster)
570    /// - int16/int32: parallel only above threshold (parallel is 1.84x faster for large datasets)
571    /// - float64: parallel only above threshold
572    ///
573    /// # Arguments
574    /// * `data_bytes` - Raw data bytes
575    /// * `bytes_per_param` - Bytes per parameter (same for all)
576    /// * `data_type` - Data type (same for all)
577    /// * `byte_order` - Byte order
578    /// * `num_events` - Number of events
579    /// * `num_params` - Number of parameters
580    ///
581    /// # Errors
582    /// Will return `Err` if parsing fails
583    #[inline]
584    fn parse_uniform_data_bulk(
585        data_bytes: &[u8],
586        bytes_per_param: usize,
587        data_type: &FcsDataType,
588        byte_order: &ByteOrder,
589        num_events: usize,
590        num_params: usize,
591    ) -> Result<Vec<f32>> {
592        let total_values = num_events * num_params;
593        let use_parallel = total_values > PARALLEL_THRESHOLD;
594        let mut f32_values = Vec::with_capacity(total_values);
595
596        match (data_type, bytes_per_param) {
597            (FcsDataType::I, 2) => {
598                // int16 - parallel is 1.84x faster for large datasets
599                if use_parallel {
600                    data_bytes
601                        .par_chunks_exact(2)
602                        .map(|chunk| {
603                            let value = match byte_order {
604                                ByteOrder::LittleEndian => LE::read_u16(chunk),
605                                ByteOrder::BigEndian => BE::read_u16(chunk),
606                            };
607                            value as f32
608                        })
609                        .collect_into_vec(&mut f32_values);
610                } else {
611                    // Sequential for small datasets
612                    f32_values = data_bytes
613                        .chunks_exact(2)
614                        .map(|chunk| {
615                            let value = match byte_order {
616                                ByteOrder::LittleEndian => LE::read_u16(chunk),
617                                ByteOrder::BigEndian => BE::read_u16(chunk),
618                            };
619                            value as f32
620                        })
621                        .collect();
622                }
623            }
624            (FcsDataType::I, 4) => {
625                // int32 - parallel only above threshold
626                if use_parallel {
627                    data_bytes
628                        .par_chunks_exact(4)
629                        .map(|chunk| {
630                            let value = match byte_order {
631                                ByteOrder::LittleEndian => LE::read_u32(chunk),
632                                ByteOrder::BigEndian => BE::read_u32(chunk),
633                            };
634                            value as f32
635                        })
636                        .collect_into_vec(&mut f32_values);
637                } else {
638                    // Sequential for small datasets
639                    f32_values = data_bytes
640                        .chunks_exact(4)
641                        .map(|chunk| {
642                            let value = match byte_order {
643                                ByteOrder::LittleEndian => LE::read_u32(chunk),
644                                ByteOrder::BigEndian => BE::read_u32(chunk),
645                            };
646                            value as f32
647                        })
648                        .collect();
649                }
650            }
651            (FcsDataType::F, 4) => {
652                // float32 - always sequential (benchmarks show 2.57x faster than parallel)
653                // This is a fallback path - normally handled by bytemuck in store_raw_data_as_dataframe
654                let needs_swap = match (byte_order, cfg!(target_endian = "little")) {
655                    (ByteOrder::LittleEndian, true) | (ByteOrder::BigEndian, false) => false,
656                    _ => true,
657                };
658                f32_values = data_bytes
659                    .chunks_exact(4)
660                    .map(|chunk| {
661                        let mut bytes = [0u8; 4];
662                        bytes.copy_from_slice(chunk);
663                        let bits = u32::from_ne_bytes(bytes);
664                        let bits = if needs_swap { bits.swap_bytes() } else { bits };
665                        f32::from_bits(bits)
666                    })
667                    .collect();
668            }
669            (FcsDataType::D, 8) => {
670                // float64 - parallel only above threshold
671                if use_parallel {
672                    data_bytes
673                        .par_chunks_exact(8)
674                        .map(|chunk| {
675                            let value = match byte_order {
676                                ByteOrder::LittleEndian => LE::read_f64(chunk),
677                                ByteOrder::BigEndian => BE::read_f64(chunk),
678                            };
679                            value as f32
680                        })
681                        .collect_into_vec(&mut f32_values);
682                } else {
683                    // Sequential for small datasets
684                    f32_values = data_bytes
685                        .chunks_exact(8)
686                        .map(|chunk| {
687                            let value = match byte_order {
688                                ByteOrder::LittleEndian => LE::read_f64(chunk),
689                                ByteOrder::BigEndian => BE::read_f64(chunk),
690                            };
691                            value as f32
692                        })
693                        .collect();
694                }
695            }
696            _ => {
697                return Err(anyhow!(
698                    "Unsupported uniform data type: {:?} with {} bytes",
699                    data_type,
700                    bytes_per_param
701                ));
702            }
703        }
704
705        Ok(f32_values)
706    }
707
708    /// Parse a parameter value from bytes to f32 based on data type and bytes per parameter
709    ///
710    /// Handles different data types:
711    /// - int16 (2 bytes) - unsigned integer
712    /// - int32 (4 bytes) - unsigned integer
713    /// - float32 (4 bytes) - single-precision floating point
714    /// - float64 (8 bytes) - double-precision floating point
715    ///
716    /// # Arguments
717    /// * `bytes` - Raw bytes for the parameter value
718    /// * `bytes_per_param` - Number of bytes per parameter (from $PnB / 8)
719    /// * `data_type` - Data type (I, F, or D)
720    /// * `byte_order` - Byte order of the file
721    ///
722    /// # Errors
723    /// Will return `Err` if the bytes cannot be parsed according to the data type
724    #[cold]
725    fn parse_parameter_value_to_f32(
726        bytes: &[u8],
727        bytes_per_param: usize,
728        data_type: &FcsDataType,
729        byte_order: &ByteOrder,
730    ) -> Result<f32> {
731        match (data_type, bytes_per_param) {
732            (FcsDataType::I, 2) => {
733                // int16 (unsigned 16-bit integer)
734                let value = match byte_order {
735                    ByteOrder::LittleEndian => LE::read_u16(bytes),
736                    ByteOrder::BigEndian => BE::read_u16(bytes),
737                };
738                Ok(value as f32)
739            }
740            (FcsDataType::I, 4) => {
741                // int32 (unsigned 32-bit integer)
742                let value = match byte_order {
743                    ByteOrder::LittleEndian => LE::read_u32(bytes),
744                    ByteOrder::BigEndian => BE::read_u32(bytes),
745                };
746                Ok(value as f32)
747            }
748            (FcsDataType::F, 4) => {
749                // float32 (single-precision floating point)
750                Ok(byte_order.read_f32(bytes))
751            }
752            (FcsDataType::D, 8) => {
753                // float64 (double-precision floating point) - convert to f32
754                let value = match byte_order {
755                    ByteOrder::LittleEndian => LE::read_f64(bytes),
756                    ByteOrder::BigEndian => BE::read_f64(bytes),
757                };
758                Ok(value as f32)
759            }
760            (FcsDataType::I, _) => Err(anyhow!(
761                "Unsupported integer size: {} bytes (expected 2 or 4)",
762                bytes_per_param
763            )),
764            (FcsDataType::F, _) => Err(anyhow!(
765                "Invalid float32 size: {} bytes (expected 4)",
766                bytes_per_param
767            )),
768            (FcsDataType::D, _) => Err(anyhow!(
769                "Invalid float64 size: {} bytes (expected 8)",
770                bytes_per_param
771            )),
772            (FcsDataType::A, _) => Err(anyhow!("ASCII data type not supported")),
773        }
774    }
775
776    /// Parse variable-width data event-by-event (cold path)
777    ///
778    /// This is the slower path used when parameters have different sizes/types.
779    /// Marked as `#[cold]` to help the compiler optimize the hot path.
780    ///
781    /// # Arguments
782    /// * `data_bytes` - Raw data bytes
783    /// * `bytes_per_parameter` - Bytes per parameter for each parameter
784    /// * `data_types` - Data type for each parameter
785    /// * `byte_order` - Byte order
786    /// * `num_events` - Number of events
787    /// * `num_params` - Number of parameters
788    ///
789    /// # Errors
790    /// Will return `Err` if parsing fails
791    #[cold]
792    fn parse_variable_width_data(
793        data_bytes: &[u8],
794        bytes_per_parameter: &[usize],
795        data_types: &[FcsDataType],
796        byte_order: &ByteOrder,
797        num_events: usize,
798        num_params: usize,
799    ) -> Result<Vec<f32>> {
800        let mut f32_values: Vec<f32> = Vec::with_capacity(num_events * num_params);
801        let mut data_offset = 0;
802
803        for event_idx in 0..num_events {
804            for (param_idx, &bytes_per_param) in bytes_per_parameter.iter().enumerate() {
805                let param_num = param_idx + 1;
806                let data_type = &data_types[param_idx];
807
808                // Extract bytes for this parameter value
809                if data_offset + bytes_per_param > data_bytes.len() {
810                    return Err(anyhow!(
811                        "Insufficient data at event {}, parameter {}: need {} bytes but only have {} remaining",
812                        event_idx + 1,
813                        param_num,
814                        bytes_per_param,
815                        data_bytes.len() - data_offset
816                    ));
817                }
818
819                let param_bytes = &data_bytes[data_offset..data_offset + bytes_per_param];
820                let f32_value = Self::parse_parameter_value_to_f32(
821                    param_bytes,
822                    bytes_per_param,
823                    data_type,
824                    byte_order,
825                )
826                .map_err(|e| anyhow!("Failed to parse parameter {} value: {}", param_num, e))?;
827
828                f32_values.push(f32_value);
829                data_offset += bytes_per_param;
830            }
831        }
832
833        Ok(f32_values)
834    }
835
836    /// Looks for the parameter name as a key in the `parameters` hashmap and returns a reference to it
837    /// Performs case-insensitive lookup for parameter names
838    /// # Errors
839    /// Will return `Err` if the parameter name is not found in the `parameters` hashmap
840    pub fn find_parameter(&self, parameter_name: &str) -> Result<&Parameter> {
841        // Try exact match first (fast path)
842        if let Some(param) = self.parameters.get(parameter_name) {
843            return Ok(param);
844        }
845
846        // Case-insensitive fallback: search through parameter map
847        for (key, param) in self.parameters.iter() {
848            if key.eq_ignore_ascii_case(parameter_name) {
849                return Ok(param);
850            }
851        }
852
853        Err(anyhow!("Parameter not found: {parameter_name}"))
854    }
855
856    /// Looks for the parameter name as a key in the `parameters` hashmap and returns a mutable reference to it
857    /// Performs case-insensitive lookup for parameter names
858    /// # Errors
859    /// Will return `Err` if the parameter name is not found in the `parameters` hashmap
860    pub fn find_mutable_parameter(&mut self, parameter_name: &str) -> Result<&mut Parameter> {
861        // Try exact match first (fast path)
862        // Note: We need to check if the key exists as Arc<str>, so we iterate to find exact match
863        let exact_key = self
864            .parameters
865            .keys()
866            .find(|k| k.as_ref() == parameter_name)
867            .map(|k| k.clone());
868
869        if let Some(key) = exact_key {
870            return self
871                .parameters
872                .get_mut(&key)
873                .ok_or_else(|| anyhow!("Parameter not found: {parameter_name}"));
874        }
875
876        // Case-insensitive fallback: find the key first (clone Arc to avoid borrow issues)
877        let matching_key = self
878            .parameters
879            .keys()
880            .find(|key| key.eq_ignore_ascii_case(parameter_name))
881            .map(|k| k.clone());
882
883        if let Some(key) = matching_key {
884            return self
885                .parameters
886                .get_mut(&key)
887                .ok_or_else(|| anyhow!("Parameter not found: {parameter_name}"));
888        }
889
890        Err(anyhow!("Parameter not found: {parameter_name}"))
891    }
892
893    /// Returns a zero-copy reference to a Polars Float32Chunked view of a column for the parameter
894    ///
895    /// This provides access to the underlying Polars chunked array, which is useful
896    /// for operations that work directly with Polars types. For most use cases,
897    /// `get_parameter_events_slice()` is preferred as it provides a simple `&[f32]` slice.
898    ///
899    /// # Arguments
900    /// * `channel_name` - The channel name (e.g., "FSC-A", "FL1-A")
901    ///
902    /// # Errors
903    /// Will return `Err` if:
904    /// - the parameter name is not found in the parameters map
905    /// - the column data type is not Float32
906    pub fn get_parameter_events(&'_ self, channel_name: &str) -> Result<&Float32Chunked> {
907        Ok(self
908            .get_parameter_column(channel_name)?
909            .f32()
910            .map_err(|e| anyhow!("Parameter {} is not f32 type: {}", channel_name, e))?)
911    }
912    /// Get a reference to the Polars Column for a parameter by channel name
913    ///
914    /// This provides direct access to the underlying Polars column, which can be useful
915    /// for advanced operations that require the full Polars API.
916    ///
917    /// # Arguments
918    /// * `channel_name` - The channel name (e.g., "FSC-A", "FL1-A")
919    ///
920    /// # Errors
921    /// Will return `Err` if the parameter name is not found in the DataFrame
922    pub fn get_parameter_column(&'_ self, channel_name: &str) -> Result<&Column> {
923        self.data_frame
924            .column(channel_name)
925            .map_err(|e| anyhow!("Parameter {} not found: {}", channel_name, e))
926    }
927
928    /// Looks for the parameter name as a key in the 'parameters' hashmap and returns a new Vec<f32> of the raw event data
929    /// NOTE: This allocates a full copy of the events - prefer `get_parameter_events_slice` when possible
930    /// # Errors
931    /// Will return 'Err' if the parameter name is not found in the 'parameters hashmap or if the events are not found
932    pub fn get_parameter_events_as_owned_vec(&self, channel_name: &str) -> Result<Vec<EventDatum>> {
933        Ok(self.get_parameter_events_slice(channel_name)?.to_vec())
934    }
935
936    /// Returns the minimum and maximum values of the parameter
937    /// # Errors
938    /// Will return `Err` if the parameter name is not found in the 'parameters' hashmap or if the events are not found
939    pub fn get_minmax_of_parameter(&self, channel_name: &str) -> Result<(EventDatum, EventDatum)> {
940        let parameter = self.find_parameter(channel_name)?;
941        let events = self.get_parameter_events(&parameter.channel_name)?;
942
943        match events.iter().minmax() {
944            MinMaxResult::NoElements => Err(anyhow!("No elements found")),
945            MinMaxResult::OneElement(e) => Err(anyhow!("Only one element found: {:?}", e)),
946            MinMaxResult::MinMax(min, max) => {
947                let min_val = min.ok_or_else(|| anyhow!("Min value is None"))?;
948                let max_val = max.ok_or_else(|| anyhow!("Max value is None"))?;
949                Ok((min_val, max_val))
950            }
951        }
952    }
953
954    /// Creates a new `HashMap` of `Parameter`s
955    /// using the `Fcs` file's metadata to find the channel and label names from the `PnN` and `PnS` keywords.
956    /// Does NOT store events on the parameter.
957    /// # Errors
958    /// Will return `Err` if:
959    /// - the number of parameters cannot be found in the metadata,
960    /// - the parameter name cannot be found in the metadata,
961    /// - the parameter cannot be built (using the Builder pattern)
962    /// Format diagnostic information about FCS file state for error messages
963    fn format_diagnostic_info(header: &Header, metadata: &Metadata, path: &Path) -> String {
964        let mut lines = Vec::new();
965
966        lines.push("=== FCS File Diagnostic Information ===".to_string());
967        lines.push(format!("File path: {}", path.display()));
968        lines.push(format!("FCS Version: {}", header.version));
969        lines.push(format!(
970            "Text segment: {}..={}",
971            header.text_offset.start(),
972            header.text_offset.end()
973        ));
974        lines.push(format!(
975            "Data segment: {}..={}",
976            header.data_offset.start(),
977            header.data_offset.end()
978        ));
979        lines.push(format!(
980            "Analysis segment: {}..={}",
981            header.analysis_offset.start(),
982            header.analysis_offset.end()
983        ));
984        lines.push(format!(
985            "Delimiter: '{}' (0x{:02x})",
986            metadata.delimiter, metadata.delimiter as u8
987        ));
988
989        // Get number of parameters
990        let n_params = metadata
991            .get_number_of_parameters()
992            .map(|n| format!("{}", n))
993            .unwrap_or_else(|e| format!("Error: {}", e));
994        lines.push(format!("Number of parameters ($PAR): {}", n_params));
995
996        // Get number of events
997        let n_events = metadata
998            .get_number_of_events()
999            .map(|n| format!("{}", n))
1000            .unwrap_or_else(|e| format!("Error: {}", e));
1001        lines.push(format!("Number of events ($TOT): {}", n_events));
1002
1003        // List all parameter keywords
1004        lines.push("\n=== Parameter Keywords ===".to_string());
1005        let number_of_parameters = metadata.get_number_of_parameters().ok().copied();
1006        if let Some(n_params) = number_of_parameters {
1007            for param_num in 1..=n_params {
1008                let pn_keywords = [
1009                    format!("$P{}N", param_num),
1010                    format!("$P{}S", param_num),
1011                    format!("$P{}B", param_num),
1012                    format!("$P{}E", param_num),
1013                    format!("$P{}R", param_num),
1014                ];
1015
1016                let mut found_keywords = Vec::new();
1017                for key in &pn_keywords {
1018                    if let Some(kw) = metadata.keywords.get(key) {
1019                        let value = match kw {
1020                            crate::keyword::Keyword::String(sk) => sk.get_str().to_string(),
1021                            crate::keyword::Keyword::Int(ik) => ik.get_usize().to_string(),
1022                            crate::keyword::Keyword::Float(fk) => fk.to_string(),
1023                            crate::keyword::Keyword::Byte(bk) => bk.get_str().to_string(),
1024                            crate::keyword::Keyword::Mixed(mk) => mk.to_string(),
1025                        };
1026                        found_keywords.push(format!("  {} = {}", key, value));
1027                    } else {
1028                        found_keywords.push(format!("  {} = <MISSING>", key));
1029                    }
1030                }
1031                lines.push(format!("Parameter {}:", param_num));
1032                lines.extend(found_keywords);
1033            }
1034        } else {
1035            lines.push("  Could not determine number of parameters".to_string());
1036        }
1037
1038        // List all keywords sorted
1039        lines.push("\n=== All Keywords (sorted) ===".to_string());
1040        let mut sorted_keys: Vec<_> = metadata.keywords.keys().collect();
1041        sorted_keys.sort();
1042        for key in sorted_keys {
1043            let kw = match metadata.keywords.get(key) {
1044                Some(kw) => kw,
1045                None => continue, // Skip missing keywords in diagnostic output
1046            };
1047            let value = match kw {
1048                crate::keyword::Keyword::String(sk) => sk.get_str().to_string(),
1049                crate::keyword::Keyword::Int(ik) => ik.get_usize().to_string(),
1050                crate::keyword::Keyword::Float(fk) => fk.to_string(),
1051                crate::keyword::Keyword::Byte(bk) => bk.get_str().to_string(),
1052                crate::keyword::Keyword::Mixed(mk) => mk.to_string(),
1053            };
1054            lines.push(format!("  {} = {}", key, value));
1055        }
1056
1057        lines.join("\n")
1058    }
1059
1060    pub fn generate_parameter_map(metadata: &Metadata) -> Result<ParameterMap> {
1061        let mut map = ParameterMap::default();
1062        let number_of_parameters = metadata.get_number_of_parameters()?;
1063        for parameter_number in 1..=*number_of_parameters {
1064            let channel_name = metadata.get_parameter_channel_name(parameter_number)
1065                .map_err(|e| anyhow!(
1066                    "Failed to get channel name for parameter {}: {}\n\nHint: Check that $P{}N keyword exists in metadata.",
1067                    parameter_number,
1068                    e,
1069                    parameter_number
1070                ))?;
1071
1072            // Use label name or fallback to the parameter name
1073            let label_name = match metadata.get_parameter_label(parameter_number) {
1074                Ok(label) => label,
1075                Err(_) => channel_name,
1076            };
1077
1078            let transform = if channel_name.contains("FSC")
1079                || channel_name.contains("SSC")
1080                || channel_name.contains("Time")
1081            {
1082                TransformType::Linear
1083            } else {
1084                TransformType::default()
1085            };
1086
1087            // Get excitation wavelength from metadata if available
1088            let excitation_wavelength = metadata
1089                .get_parameter_excitation_wavelength(parameter_number)
1090                .ok()
1091                .flatten();
1092
1093            let parameter = ParameterBuilder::default()
1094                // For the ParameterBuilder, ensure we're using the proper methods
1095                // that may be defined by the Builder derive macro
1096                .parameter_number(parameter_number)
1097                .channel_name(channel_name)
1098                .label_name(label_name)
1099                .transform(transform)
1100                .excitation_wavelength(excitation_wavelength)
1101                .build()?;
1102
1103            // Add the parameter events to the hashmap keyed by the parameter name
1104            map.insert(channel_name.to_string().into(), parameter);
1105        }
1106
1107        Ok(map)
1108    }
1109
1110    /// Looks for a keyword among the metadata and returns its value as a `&str`
1111    /// # Errors
1112    /// Will return `Err` if the `Keyword` is not found in the `metadata` or if the `Keyword` cannot be converted to a `&str`
1113    pub fn get_keyword_string_value(&self, keyword: &str) -> Result<Cow<'_, str>> {
1114        // TODO: This should be a match statement
1115        if let Ok(keyword) = self.metadata.get_string_keyword(keyword) {
1116            Ok(keyword.get_str())
1117        } else if let Ok(keyword) = self.metadata.get_integer_keyword(keyword) {
1118            Ok(keyword.get_str())
1119        } else if let Ok(keyword) = self.metadata.get_float_keyword(keyword) {
1120            Ok(keyword.get_str())
1121        } else if let Ok(keyword) = self.metadata.get_byte_keyword(keyword) {
1122            Ok(keyword.get_str())
1123        } else if let Ok(keyword) = self.metadata.get_mixed_keyword(keyword) {
1124            Ok(keyword.get_str())
1125        } else {
1126            Err(anyhow!("Keyword not found: {}", keyword))
1127        }
1128    }
1129    /// A convenience function to return the `GUID` keyword from the `metadata` as a `&str`
1130    /// # Errors
1131    /// Will return `Err` if the `GUID` keyword is not found in the `metadata` or if the `GUID` keyword cannot be converted to a `&str`
1132    pub fn get_guid(&self) -> Result<Cow<'_, str>> {
1133        Ok(self.metadata.get_string_keyword("GUID")?.get_str())
1134    }
1135
1136    /// Set or update the GUID keyword in the file's metadata
1137    pub fn set_guid(&mut self, guid: String) {
1138        self.metadata
1139            .insert_string_keyword("GUID".to_string(), guid);
1140    }
1141
1142    /// A convenience function to return the `$FIL` keyword from the `metadata` as a `&str`
1143    /// # Errors
1144    /// Will return `Err` if the `$FIL` keyword is not found in the `metadata` or if the `$FIL` keyword cannot be converted to a `&str`
1145    pub fn get_fil_keyword(&self) -> Result<Cow<'_, str>> {
1146        Ok(self.metadata.get_string_keyword("$FIL")?.get_str())
1147    }
1148
1149    /// A convenience function to return the `$TOT` keyword from the `metadata` as a `usize`
1150    /// # Errors
1151    /// Will return `Err` if the `$TOT` keyword is not found in the `metadata` or if the `$TOT` keyword cannot be converted to a `usize`
1152    pub fn get_number_of_events(&self) -> Result<&usize> {
1153        self.metadata.get_number_of_events()
1154    }
1155
1156    /// A convenience function to return the `$PAR` keyword from the `metadata` as a `usize`
1157    /// # Errors
1158    /// Will return `Err` if the `$PAR` keyword is not found in the `metadata` or if the `$PAR` keyword cannot be converted to a `usize`
1159    pub fn get_number_of_parameters(&self) -> Result<&usize> {
1160        self.metadata.get_number_of_parameters()
1161    }
1162
1163    // ==================== NEW POLARS-BASED ACCESSOR METHODS ====================
1164
1165    /// Get events for a parameter as a slice of f32 values
1166    /// Polars gives us direct access to the underlying buffer (zero-copy)
1167    /// # Errors
1168    /// Will return `Err` if:
1169    /// - the parameter name is not found
1170    /// - the Series data type is not Float32
1171    /// - the data is chunked (rare for FCS files)
1172    pub fn get_parameter_events_slice(&self, channel_name: &str) -> Result<&[f32]> {
1173        self.get_parameter_events(channel_name)?
1174            .cont_slice()
1175            .map_err(|e| anyhow!("Parameter {} data is not contiguous: {}", channel_name, e))
1176    }
1177
1178    /// Get two parameters as (x, y) pairs for plotting
1179    /// Optimized for scatter plot use case with zero allocations until the collect
1180    /// # Errors
1181    /// Will return `Err` if either parameter name is not found
1182    pub fn get_xy_pairs(&self, x_param: &str, y_param: &str) -> Result<Vec<(f32, f32)>> {
1183        let x_data = self.get_parameter_events_slice(x_param)?;
1184        let y_data = self.get_parameter_events_slice(y_param)?;
1185
1186        // Verify both parameters have the same length
1187        if x_data.len() != y_data.len() {
1188            return Err(anyhow!(
1189                "Parameter length mismatch: {} has {} events, {} has {} events",
1190                x_param,
1191                x_data.len(),
1192                y_param,
1193                y_data.len()
1194            ));
1195        }
1196
1197        // Zip is zero-cost abstraction - uses iterators efficiently
1198        Ok(x_data
1199            .iter()
1200            .zip(y_data.iter())
1201            .map(|(&x, &y)| (x, y))
1202            .collect())
1203    }
1204
1205    /// Get DataFrame height (number of events)
1206    #[must_use]
1207    pub fn get_event_count_from_dataframe(&self) -> usize {
1208        self.data_frame.height()
1209    }
1210
1211    /// Get DataFrame width (number of parameters)
1212    #[must_use]
1213    pub fn get_parameter_count_from_dataframe(&self) -> usize {
1214        self.data_frame.width()
1215    }
1216
1217    /// Get DataFrame column names (parameter names)
1218    pub fn get_parameter_names_from_dataframe(&self) -> Vec<String> {
1219        self.data_frame
1220            .get_column_names()
1221            .into_iter()
1222            .map(|s| s.to_string())
1223            .collect()
1224    }
1225
1226    /// Aggregate statistics for a parameter using Polars' streaming API for low memory usage and minimal, chunked passes.
1227    ///
1228    /// When streaming is enabled, Polars creates a *Pipeline*:
1229    ///
1230    /// **Source**: It pulls a chunk of data from the disk (e.g., 50,000 rows).
1231    ///
1232    /// **Operators**: It passes that chunk through your expressions (calculating the running sum, count, min, and max for that specific chunk).
1233    ///
1234    /// **Sink**: It aggregates the results from all chunks into a final result.
1235    ///
1236    /// Because the statistics we are calculating (min, max, mean) are *associative* and *commutative*, Polars can calculate them partially on each chunk and then combine them at the very end.
1237    ///
1238    /// Returns (min, max, mean, std_dev)
1239    /// # Errors
1240    /// Will return `Err` if the parameter is not found or stats calculation fails
1241    pub fn get_parameter_statistics(&self, channel_name: &str) -> Result<(f32, f32, f32, f32)> {
1242        let stats = (*self.data_frame)
1243            .clone()
1244            .lazy()
1245            .select([
1246                col(channel_name).min().alias("min"),
1247                col(channel_name).max().alias("max"),
1248                col(channel_name).mean().alias("mean"),
1249                col(channel_name).std(1).alias("std"),
1250            ])
1251            .collect_with_engine(Engine::Streaming)?;
1252        let min = stats
1253            .column("min")
1254            .map_err(|e| anyhow!("Column 'min' not found in statistics: {}", e))?
1255            .f32()
1256            .map_err(|e| anyhow!("Column 'min' is not f32 type: {}", e))?
1257            .get(0)
1258            .ok_or_else(|| anyhow!("No min value found"))?;
1259        let max = stats
1260            .column("max")
1261            .map_err(|e| anyhow!("Column 'max' not found in statistics: {}", e))?
1262            .f32()
1263            .map_err(|e| anyhow!("Column 'max' is not f32 type: {}", e))?
1264            .get(0)
1265            .ok_or_else(|| anyhow!("No max value found"))?;
1266        let mean = stats
1267            .column("mean")
1268            .map_err(|e| anyhow!("Column 'mean' not found in statistics: {}", e))?
1269            .f32()
1270            .map_err(|e| anyhow!("Column 'mean' is not f32 type: {}", e))?
1271            .get(0)
1272            .ok_or_else(|| anyhow!("No mean value found"))?;
1273        let std = stats
1274            .column("std")
1275            .map_err(|e| anyhow!("Column 'std' not found in statistics: {}", e))?
1276            .f32()
1277            .map_err(|e| anyhow!("Column 'std' is not f32 type: {}", e))?
1278            .get(0)
1279            .ok_or_else(|| anyhow!("No std deviation value found"))?;
1280
1281        Ok((min, max, mean, std))
1282    }
1283
1284    // ==================== TRANSFORMATION METHODS ====================
1285
1286    /// Apply arcsinh transformation to a parameter using Polars
1287    /// This is the most common transformation for flow cytometry data
1288    /// Formula: arcsinh(x / cofactor)
1289    ///
1290    /// # Arguments
1291    /// * `parameter_name` - Name of the parameter to transform
1292    /// * `cofactor` - Scaling factor (typical: 150-200 for modern instruments)
1293    ///
1294    /// # Returns
1295    /// New DataFrame with the transformed parameter
1296    pub fn apply_arcsinh_transform(
1297        &self,
1298        parameter_name: &str,
1299        cofactor: f32,
1300    ) -> Result<EventDataFrame> {
1301        let df = (*self.data_frame).clone();
1302
1303        // Get the column to transform
1304        let col = df
1305            .column(parameter_name)
1306            .map_err(|e| anyhow!("Parameter {} not found: {}", parameter_name, e))?;
1307
1308        let series = col.as_materialized_series();
1309        let ca = series
1310            .f32()
1311            .map_err(|e| anyhow!("Parameter {} is not f32: {}", parameter_name, e))?;
1312
1313        // Apply arcsinh transformation using TransformType implementation
1314        // The division by ln(10) was incorrectly converting to log10 scale,
1315        // which compressed the data ~2.3x and caused MAD to over-remove events
1316        use rayon::prelude::*;
1317        let transform = TransformType::Arcsinh { cofactor };
1318        let transformed: Vec<f32> = ca
1319            .cont_slice()
1320            .map_err(|e| anyhow!("Data not contiguous: {}", e))?
1321            .par_iter()
1322            .map(|&x| transform.transform(&x))
1323            .collect();
1324
1325        // Create new column with transformed data
1326        let new_series = Series::new(parameter_name.into(), transformed);
1327
1328        // Replace the column in DataFrame
1329        let mut new_df = df;
1330        new_df
1331            .replace(parameter_name, new_series.into())
1332            .map_err(|e| anyhow!("Failed to replace column: {}", e))?;
1333
1334        Ok(Arc::new(new_df))
1335    }
1336
1337    /// Apply arcsinh transformation to multiple parameters
1338    ///
1339    /// # Arguments
1340    /// * `parameters` - List of (parameter_name, cofactor) pairs
1341    ///
1342    /// # Returns
1343    /// New DataFrame with all specified parameters transformed
1344    pub fn apply_arcsinh_transforms(&self, parameters: &[(&str, f32)]) -> Result<EventDataFrame> {
1345        let mut df = (*self.data_frame).clone();
1346
1347        use rayon::prelude::*;
1348
1349        for &(param_name, cofactor) in parameters {
1350            let col = df
1351                .column(param_name)
1352                .map_err(|e| anyhow!("Parameter {} not found: {}", param_name, e))?;
1353
1354            let series = col.as_materialized_series();
1355            let ca = series
1356                .f32()
1357                .map_err(|e| anyhow!("Parameter {} is not f32: {}", param_name, e))?;
1358
1359            // Apply arcsinh transformation using TransformType implementation
1360            // Standard flow cytometry arcsinh - no division by ln(10)
1361            let transform = TransformType::Arcsinh { cofactor };
1362            let transformed: Vec<f32> = ca
1363                .cont_slice()
1364                .map_err(|e| anyhow!("Data not contiguous: {}", e))?
1365                .par_iter()
1366                .map(|&x| transform.transform(&x))
1367                .collect();
1368
1369            let new_series = Series::new(param_name.into(), transformed);
1370            df.replace(param_name, new_series.into())
1371                .map_err(|e| anyhow!("Failed to replace column {}: {}", param_name, e))?;
1372        }
1373
1374        Ok(Arc::new(df))
1375    }
1376
1377    /// Apply default arcsinh transformation to all fluorescence parameters
1378    /// Automatically detects fluorescence parameters (excludes FSC, SSC, Time)
1379    /// Uses cofactor = 200 (good default for modern instruments)
1380    pub fn apply_default_arcsinh_transform(&self) -> Result<EventDataFrame> {
1381        let param_names = self.get_parameter_names_from_dataframe();
1382
1383        // Filter to fluorescence parameters (exclude scatter and time)
1384        let fluor_params: Vec<(&str, f32)> = param_names
1385            .iter()
1386            .filter(|name| {
1387                let upper = name.to_uppercase();
1388                !upper.contains("FSC") && !upper.contains("SSC") && !upper.contains("TIME")
1389            })
1390            .map(|name| (name.as_str(), 2000.0)) // Default cofactor = 2000
1391            .collect();
1392
1393        self.apply_arcsinh_transforms(&fluor_params)
1394    }
1395
1396    /// Apply biexponential (logicle) transformation matching FlowJo defaults
1397    /// Automatically detects fluorescence parameters (excludes FSC, SSC, Time)
1398    /// Uses FlowJo default parameters: top_of_scale=262144 (18-bit), positive_decades=4.5, negative_decades=0, width=0.5
1399    pub fn apply_default_biexponential_transform(&self) -> Result<EventDataFrame> {
1400        let param_names = self.get_parameter_names_from_dataframe();
1401
1402        // Filter to fluorescence parameters (exclude scatter and time)
1403        let fluor_params: Vec<&str> = param_names
1404            .iter()
1405            .filter(|name| {
1406                let upper = name.to_uppercase();
1407                !upper.contains("FSC") && !upper.contains("SSC") && !upper.contains("TIME")
1408            })
1409            .map(|name| name.as_str())
1410            .collect();
1411
1412        let mut df = (*self.data_frame).clone();
1413
1414        use rayon::prelude::*;
1415
1416        // FlowJo default biexponential parameters
1417        let transform = TransformType::Biexponential {
1418            top_of_scale: 262144.0, // 18-bit data (2^18)
1419            positive_decades: 4.5,
1420            negative_decades: 0.0,
1421            width: 0.5,
1422        };
1423
1424        for param_name in fluor_params {
1425            let col = df
1426                .column(param_name)
1427                .map_err(|e| anyhow!("Parameter {} not found: {}", param_name, e))?;
1428
1429            let series = col.as_materialized_series();
1430            let ca = series
1431                .f32()
1432                .map_err(|e| anyhow!("Parameter {} is not f32: {}", param_name, e))?;
1433
1434            // Apply biexponential transformation using TransformType implementation
1435            let transformed: Vec<f32> = ca
1436                .cont_slice()
1437                .map_err(|e| anyhow!("Data not contiguous: {}", e))?
1438                .par_iter()
1439                .map(|&x| transform.transform(&x))
1440                .collect();
1441
1442            let new_series = Series::new(param_name.into(), transformed);
1443            df.replace(param_name, new_series.into())
1444                .map_err(|e| anyhow!("Failed to replace column {}: {}", param_name, e))?;
1445        }
1446
1447        Ok(Arc::new(df))
1448    }
1449
1450    // ==================== COMPENSATION METHODS ====================
1451
1452    /// Extract compensation matrix from $SPILLOVER keyword
1453    /// Returns (matrix, channel_names) if spillover keyword exists
1454    /// Returns None if no spillover keyword is present in the file
1455    ///
1456    /// # Returns
1457    /// Some((compensation_matrix, channel_names)) if spillover exists, None otherwise
1458    ///
1459    /// # Errors
1460    /// Will return `Err` if spillover keyword is malformed
1461    pub fn get_spillover_matrix(&self) -> Result<Option<(Mat<f32>, Vec<String>)>> {
1462        use crate::keyword::{Keyword, MixedKeyword};
1463
1464        // Try to get compensation matrix from $SPILLOVER (FCS 3.1+), $SPILL (unofficial/custom), or $COMP (FCS 3.0)
1465        // Check in order of preference: SPILLOVER (official) > SPILL (common) > COMP (legacy)
1466        let spillover_keyword = self
1467            .metadata
1468            .keywords
1469            .get("$SPILLOVER")
1470            .or_else(|| self.metadata.keywords.get("$SPILL"))
1471            .or_else(|| self.metadata.keywords.get("$COMP"));
1472
1473        let spillover_keyword = match spillover_keyword {
1474            Some(Keyword::Mixed(MixedKeyword::SPILLOVER {
1475                n_parameters,
1476                parameter_names,
1477                matrix_values,
1478            })) => (
1479                *n_parameters,
1480                parameter_names.clone(),
1481                matrix_values.clone(),
1482            ),
1483            Some(_) => {
1484                // Keyword exists but has wrong type - might be stored as String(Other) if parsing failed
1485                // Try to parse it manually
1486                let keyword_name = if self.metadata.keywords.contains_key("$SPILLOVER") {
1487                    "$SPILLOVER"
1488                } else if self.metadata.keywords.contains_key("$SPILL") {
1489                    "$SPILL"
1490                } else if self.metadata.keywords.contains_key("$COMP") {
1491                    "$COMP"
1492                } else {
1493                    return Ok(None);
1494                };
1495
1496                // Try to get the raw string value and parse it
1497                // This handles the case where $SPILL/$COMP was stored as String(Other) because
1498                // it wasn't recognized during initial parsing
1499                if let Some(Keyword::String(crate::keyword::StringKeyword::Other(value))) =
1500                    self.metadata.keywords.get(keyword_name)
1501                {
1502                    // Parse the string value as SPILLOVER using the same logic as parse_spillover
1503                    let parts: Vec<&str> = value.trim().split(',').collect();
1504                    if !parts.is_empty() {
1505                        if let Ok(n_parameters) = parts[0].trim().parse::<usize>() {
1506                            if parts.len() >= 1 + n_parameters {
1507                                let parameter_names: Vec<String> = parts[1..=n_parameters]
1508                                    .iter()
1509                                    .map(|s| s.trim().to_string())
1510                                    .collect();
1511
1512                                let expected_matrix_size = n_parameters * n_parameters;
1513                                let matrix_start = 1 + n_parameters;
1514
1515                                if parts.len() >= matrix_start + expected_matrix_size {
1516                                    // Parse matrix values (handle comma decimal separator)
1517                                    let mut matrix_values = Vec::new();
1518                                    for part in
1519                                        &parts[matrix_start..matrix_start + expected_matrix_size]
1520                                    {
1521                                        let cleaned = part.trim().replace(',', ".");
1522                                        if let Ok(val) = cleaned.parse::<f32>() {
1523                                            matrix_values.push(val);
1524                                        } else {
1525                                            break; // Failed to parse, give up
1526                                        }
1527                                    }
1528
1529                                    if matrix_values.len() == expected_matrix_size {
1530                                        let matrix = Mat::from_fn(
1531                                            n_parameters,
1532                                            n_parameters,
1533                                            |i, j| matrix_values[i * n_parameters + j],
1534                                        );
1535                                        return Ok(Some((matrix, parameter_names)));
1536                                    }
1537                                }
1538                            }
1539                        }
1540                    }
1541                }
1542
1543                return Err(anyhow!(
1544                    "{} keyword exists but has wrong type or could not be parsed",
1545                    keyword_name
1546                ));
1547            }
1548            None => {
1549                // No spillover keyword - this is fine, not all files have it
1550                return Ok(None);
1551            }
1552        };
1553
1554        let (n_params, param_names, matrix_values): (usize, Vec<String>, Vec<f32>) =
1555            spillover_keyword;
1556
1557        // Validate matrix dimensions
1558        let expected_matrix_size = n_params * n_params;
1559        if matrix_values.len() != expected_matrix_size {
1560            return Err(anyhow!(
1561                "SPILLOVER matrix size mismatch: expected {} values for {}x{} matrix, got {}",
1562                expected_matrix_size,
1563                n_params,
1564                n_params,
1565                matrix_values.len()
1566            ));
1567        }
1568
1569        // Create Mat from matrix values (FCS spillover is stored row-major)
1570        let matrix =
1571            Mat::from_fn(n_params, n_params, |i, j| matrix_values[i * n_params + j]);
1572
1573        Ok(Some((matrix, param_names)))
1574    }
1575
1576    /// Check if this file has compensation information
1577    #[must_use]
1578    pub fn has_compensation(&self) -> bool {
1579        self.get_spillover_matrix()
1580            .map(|opt| opt.is_some())
1581            .unwrap_or(false)
1582    }
1583
1584    /// Resolve spillover channel labels to actual $PnN channel names.
1585    /// Some instruments (e.g. IntelliCyt iQue3) use parameter numbers ("1","2","3") in
1586    /// $SPILLOVER instead of channel names; these are mapped via metadata.get_parameter_channel_name.
1587    fn resolve_spillover_channel_names(&self, channel_names: &[String]) -> Result<Vec<String>> {
1588        let mut resolved = Vec::with_capacity(channel_names.len());
1589        for name in channel_names {
1590            let resolved_name = if let Ok(param_idx) = name.trim().parse::<usize>() {
1591                if param_idx >= 1 {
1592                    self.metadata
1593                        .get_parameter_channel_name(param_idx)
1594                        .map(|s| s.to_string())
1595                        .map_err(|e| {
1596                            anyhow!(
1597                                "Parameter {} not found: spillover references \"{}\" but {}",
1598                                param_idx,
1599                                name,
1600                                e
1601                            )
1602                        })?
1603                } else {
1604                    name.clone()
1605                }
1606            } else {
1607                name.clone()
1608            };
1609            if !self.parameters.contains_key(resolved_name.as_str()) {
1610                return Err(anyhow!(
1611                    "Spillover channel \"{}\" (resolved) not found in parameters",
1612                    resolved_name
1613                ));
1614            }
1615            resolved.push(resolved_name);
1616        }
1617        Ok(resolved)
1618    }
1619
1620    /// Apply compensation from the file's $SPILLOVER keyword
1621    /// Convenience method that extracts spillover and applies it automatically
1622    ///
1623    /// Resolves spillover channel labels: some instruments (e.g. IntelliCyt iQue3) use
1624    /// parameter numbers ("1", "2", "3") in $SPILLOVER instead of $PnN names; these are
1625    /// mapped to actual channel names via metadata.
1626    ///
1627    /// # Returns
1628    /// New DataFrame with compensated data, or error if no spillover keyword exists
1629    pub fn apply_file_compensation(&self) -> Result<EventDataFrame> {
1630        let (comp_matrix, channel_names) = self
1631            .get_spillover_matrix()?
1632            .ok_or_else(|| anyhow!("No $SPILLOVER keyword found in FCS file"))?;
1633
1634        let channel_refs: Vec<String> = self.resolve_spillover_channel_names(&channel_names)?;
1635        let channel_refs: Vec<&str> = channel_refs.iter().map(|s| s.as_str()).collect();
1636
1637        self.apply_compensation(comp_matrix.as_ref(), &channel_refs)
1638    }
1639
1640    /// OPTIMIZED: Get compensated data for specific parameters only (lazy/partial compensation)
1641    ///
1642    /// This is 15-30x faster than apply_file_compensation when you only need a few parameters
1643    /// because it:
1644    /// - Only compensates the requested channels (e.g., 2 vs 30)
1645    /// - Uses sparse matrix optimization for matrices with >80% zeros
1646    /// - Bypasses compensation entirely for identity matrices
1647    ///
1648    /// # Arguments
1649    /// * `channels_needed` - Only the channel names you need compensated (typically 2 for a plot)
1650    ///
1651    /// # Returns
1652    /// HashMap of channel_name -> compensated data (as Vec<f32>)
1653    ///
1654    /// # Performance
1655    /// - Dense matrix (2/30 channels): **15x faster** (150ms → 10ms)
1656    /// - Sparse matrix (90% sparse): **50x faster** (150ms → 3ms)
1657    /// - Identity matrix: **300x faster** (150ms → 0.5ms)
1658    pub fn get_compensated_parameters(
1659        &self,
1660        channels_needed: &[&str],
1661    ) -> Result<std::collections::HashMap<String, Vec<f32>>> {
1662        use std::collections::HashMap;
1663
1664        // Get spillover matrix
1665        let (comp_matrix, matrix_channel_names) = self
1666            .get_spillover_matrix()?
1667            .ok_or_else(|| anyhow!("No $SPILLOVER keyword found in FCS file"))?;
1668
1669        let n_events = self.get_event_count_from_dataframe();
1670
1671        // OPTIMIZATION 1: Check if matrix is identity (no compensation needed)
1672        let is_identity = {
1673            let mut is_id = true;
1674            for i in 0..comp_matrix.nrows() {
1675                for j in 0..comp_matrix.ncols() {
1676                    let expected = if i == j { 1.0 } else { 0.0 };
1677                    if (comp_matrix[(i, j)] - expected).abs() > 1e-6 {
1678                        is_id = false;
1679                        break;
1680                    }
1681                }
1682                if !is_id {
1683                    break;
1684                }
1685            }
1686            is_id
1687        };
1688
1689        if is_identity {
1690            eprintln!("🚀 Identity matrix detected - bypassing compensation");
1691            // Just return original data
1692            let mut result = HashMap::new();
1693            for &channel in channels_needed {
1694                let data = self.get_parameter_events_slice(channel)?;
1695                result.insert(channel.to_string(), data.to_vec());
1696            }
1697            return Ok(result);
1698        }
1699
1700        // OPTIMIZATION 2: Analyze sparsity
1701        let total_elements = comp_matrix.nrows() * comp_matrix.ncols();
1702        let mut non_zero_count = 0;
1703        for i in 0..comp_matrix.nrows() {
1704            for j in 0..comp_matrix.ncols() {
1705                if comp_matrix[(i, j)].abs() > 1e-6 {
1706                    non_zero_count += 1;
1707                }
1708            }
1709        }
1710        let sparsity = 1.0 - (non_zero_count as f64 / total_elements as f64);
1711        let is_sparse = sparsity > 0.8;
1712
1713        eprintln!(
1714            "📊 Compensation matrix: {:.1}% sparse, {} non-zero coefficients",
1715            sparsity * 100.0,
1716            non_zero_count
1717        );
1718
1719        // Find indices of channels we need
1720        let channel_indices: HashMap<&str, usize> = matrix_channel_names
1721            .iter()
1722            .enumerate()
1723            .map(|(i, name)| (name.as_str(), i))
1724            .collect();
1725
1726        let needed_indices: Vec<(String, usize)> = channels_needed
1727            .iter()
1728            .filter_map(|&ch| channel_indices.get(ch).map(|&idx| (ch.to_string(), idx)))
1729            .collect();
1730
1731        if needed_indices.is_empty() {
1732            return Err(anyhow!(
1733                "None of the requested channels found in compensation matrix"
1734            ));
1735        }
1736
1737        // Extract ONLY the channels involved in compensating our needed channels
1738        // For each needed channel, we need all channels that have non-zero spillover
1739        let mut involved_indices = std::collections::HashSet::new();
1740        for &(_, row_idx) in &needed_indices {
1741            // Add the channel itself
1742            involved_indices.insert(row_idx);
1743
1744            // Add channels with non-zero spillover
1745            if is_sparse {
1746                for col_idx in 0..comp_matrix.ncols() {
1747                    if comp_matrix[(row_idx, col_idx)].abs() > 1e-6 {
1748                        involved_indices.insert(col_idx);
1749                    }
1750                }
1751            } else {
1752                // For dense matrix, we need all channels
1753                for i in 0..comp_matrix.ncols() {
1754                    involved_indices.insert(i);
1755                }
1756            }
1757        }
1758
1759        let mut involved_vec: Vec<usize> = involved_indices.into_iter().collect();
1760        involved_vec.sort_unstable();
1761
1762        eprintln!(
1763            "🎯 Lazy compensation: loading {} channels (vs {} total)",
1764            involved_vec.len(),
1765            matrix_channel_names.len()
1766        );
1767
1768        // Extract data for involved channels only
1769        let mut channel_data: Vec<Vec<f32>> = Vec::with_capacity(involved_vec.len());
1770        for &idx in &involved_vec {
1771            let channel_name = &matrix_channel_names[idx];
1772            let data = self.get_parameter_events_slice(channel_name)?;
1773            channel_data.push(data.to_vec());
1774        }
1775
1776        // Extract sub-matrix for involved channels
1777        let sub_matrix = Mat::from_fn(involved_vec.len(), involved_vec.len(), |i, j| {
1778            comp_matrix[(involved_vec[i], involved_vec[j])]
1779        });
1780
1781        // Use CPU compensation (benchmarked: GPU was slower due to transfer overhead)
1782        // Invert sub-matrix using faer (pure Rust, no system BLAS)
1783        let comp_inv = crate::matrix::MatrixOps::invert_matrix(sub_matrix.as_ref())
1784            .map_err(|e| anyhow!("Failed to invert compensation matrix: {:?}", e))?;
1785
1786        // Compensate ONLY the involved channels
1787        use rayon::prelude::*;
1788        let n_involved = involved_vec.len();
1789        let compensated_data: Vec<Vec<f32>> = (0..n_involved)
1790            .into_par_iter()
1791            .map(|i| {
1792                let mut result = vec![0.0; n_events];
1793                for event_idx in 0..n_events {
1794                    let mut sum = 0.0;
1795                    for j in 0..n_involved {
1796                        sum += comp_inv[(i, j)] * channel_data[j][event_idx];
1797                    }
1798                    result[event_idx] = sum;
1799                }
1800                result
1801            })
1802            .collect();
1803
1804        // Build result HashMap for only the channels we need
1805        let mut result = HashMap::new();
1806        for (channel_name, orig_idx) in needed_indices {
1807            if let Some(local_idx) = involved_vec.iter().position(|&x| x == orig_idx) {
1808                result.insert(channel_name, compensated_data[local_idx].clone());
1809            }
1810        }
1811
1812        eprintln!("🚀 Lazy compensation completed");
1813        Ok(result)
1814    }
1815
1816    /// Apply compensation matrix to the data using Polars
1817    /// Compensation corrects for spectral overlap between fluorescence channels
1818    ///
1819    /// # Arguments
1820    /// * `compensation_matrix` - 2D matrix where element [i,j] represents spillover from channel j into channel i
1821    /// * `channel_names` - Names of channels in the order they appear in the matrix
1822    ///
1823    /// # Returns
1824    /// New DataFrame with compensated fluorescence values
1825    ///
1826    /// # Example
1827    /// ```ignore
1828    /// // Create a 3x3 compensation matrix
1829    /// use faer::mat;
1830    /// let comp_matrix = mat![
1831    ///     [1.0, 0.1, 0.05],  // FL1-A compensation
1832    ///     [0.2, 1.0, 0.1],   // FL2-A compensation
1833    ///     [0.1, 0.15, 1.0],  // FL3-A compensation
1834    /// ];
1835    /// let channels = vec!["FL1-A", "FL2-A", "FL3-A"];
1836    /// let compensated = fcs.apply_compensation(comp_matrix.as_ref(), &channels)?;
1837    /// ```
1838    pub fn apply_compensation(
1839        &self,
1840        compensation_matrix: faer::MatRef<'_, f32>,
1841        channel_names: &[&str],
1842    ) -> Result<EventDataFrame> {
1843        let comp = compensation_matrix;
1844        // Verify matrix dimensions match channel names
1845        let n_channels = channel_names.len();
1846        if comp.nrows() != n_channels || comp.ncols() != n_channels {
1847            return Err(anyhow!(
1848                "Compensation matrix dimensions ({}, {}) don't match number of channels ({})",
1849                comp.nrows(),
1850                comp.ncols(),
1851                n_channels
1852            ));
1853        }
1854
1855        // Extract data for channels to compensate
1856        let mut channel_data: Vec<Vec<f32>> = Vec::with_capacity(n_channels);
1857        let n_events = self.get_event_count_from_dataframe();
1858
1859        for &channel_name in channel_names {
1860            let data = self.get_parameter_events_slice(channel_name)?;
1861            channel_data.push(data.to_vec());
1862        }
1863
1864        // Use CPU compensation (benchmarked: GPU was slower due to transfer overhead)
1865        // Apply compensation: compensated = original * inverse(compensation_matrix)
1866        // For efficiency, we pre-compute the inverse using faer (pure Rust, no system BLAS)
1867        let comp_inv =
1868            crate::matrix::MatrixOps::invert_matrix(comp).map_err(|e| {
1869                anyhow!("Failed to invert compensation matrix: {:?}", e)
1870            })?;
1871
1872        // Perform matrix multiplication for each event
1873        use rayon::prelude::*;
1874        let compensated_data: Vec<Vec<f32>> = (0..n_channels)
1875            .into_par_iter()
1876            .map(|i| {
1877                let mut result = vec![0.0; n_events];
1878                for event_idx in 0..n_events {
1879                    let mut sum = 0.0;
1880                    for j in 0..n_channels {
1881                        sum += comp_inv[(i, j)] * channel_data[j][event_idx];
1882                    }
1883                    result[event_idx] = sum;
1884                }
1885                result
1886            })
1887            .collect();
1888
1889        // Create new DataFrame with compensated values
1890        let mut df = (*self.data_frame).clone();
1891
1892        for (i, &channel_name) in channel_names.iter().enumerate() {
1893            let new_series = Series::new(channel_name.into(), compensated_data[i].clone());
1894            df.replace(channel_name, new_series.into())
1895                .map_err(|e| anyhow!("Failed to replace column {}: {}", channel_name, e))?;
1896        }
1897
1898        Ok(Arc::new(df))
1899    }
1900
1901    /// Apply spectral unmixing (matrix solve only, no compensation or transformation)
1902    ///
1903    /// This method performs unmixing by solving: observation = mixing_matrix × abundances
1904    /// Does NOT apply compensation or transformations - these should be done separately.
1905    ///
1906    /// For overdetermined systems (more detectors than endmembers), uses least squares.
1907    ///
1908    /// # Arguments
1909    /// * `unmixing_matrix` - Matrix describing spectral signatures of fluorophores (detectors × endmembers)
1910    /// * `detector_names` - Names of detector channels (must match matrix rows)
1911    /// * `endmember_names` - Optional names for endmembers (columns). If None, uses "Endmember1", "Endmember2", etc.
1912    ///
1913    /// # Returns
1914    /// New DataFrame with unmixed endmember abundances (columns named by endmember names or indices)
1915    ///
1916    /// # Errors
1917    /// Returns error if detector names don't match matrix dimensions or data cannot be extracted
1918    pub fn apply_spectral_unmixing(
1919        &self,
1920        unmixing_matrix: faer::MatRef<'_, f32>,
1921        detector_names: &[&str],
1922        endmember_names: Option<&[&str]>,
1923    ) -> Result<EventDataFrame> {
1924        use faer::linalg::solvers::{Qr, SolveLstsq};
1925
1926        // Verify matrix dimensions match detector names
1927        let n_detectors = detector_names.len();
1928        if unmixing_matrix.nrows() != n_detectors {
1929            return Err(anyhow!(
1930                "Unmixing matrix rows ({}) don't match number of detectors ({})",
1931                unmixing_matrix.nrows(),
1932                n_detectors
1933            ));
1934        }
1935
1936        let n_endmembers = unmixing_matrix.ncols();
1937        let n_events = self.get_event_count_from_dataframe();
1938
1939        // Extract data for detectors
1940        let mut detector_data: Vec<Vec<f32>> = Vec::with_capacity(n_detectors);
1941        for &detector_name in detector_names {
1942            let data = self.get_parameter_events_slice(detector_name)?;
1943            detector_data.push(data.to_vec());
1944        }
1945
1946        // Observations matrix: events × detectors
1947        let observations =
1948            Mat::from_fn(n_events, n_detectors, |event_idx, detector_idx| {
1949                detector_data[detector_idx][event_idx]
1950            });
1951
1952        // Perform unmixing: for each event, solve: observation = unmixing_matrix × abundances
1953        // For overdetermined systems (n_detectors > n_endmembers), use QR least squares (faer)
1954        let qr = Qr::new(unmixing_matrix);
1955
1956        let mut unmixed_data: Vec<Vec<f32>> = Vec::with_capacity(n_events);
1957        for event_idx in 0..n_events {
1958            let b_col =
1959                Mat::from_fn(n_detectors, 1, |i, _| observations[(event_idx, i)]);
1960            let x_faer = qr.solve_lstsq(b_col.as_ref());
1961            let abundances: Vec<f32> =
1962                (0..n_endmembers).map(|i| x_faer[(i, 0)]).collect();
1963            unmixed_data.push(abundances);
1964        }
1965
1966        // Create DataFrame with endmember abundances
1967        let mut columns: Vec<Column> = Vec::with_capacity(n_endmembers);
1968        for endmember_idx in 0..n_endmembers {
1969            let values: Vec<f32> = unmixed_data
1970                .iter()
1971                .map(|abundances| abundances[endmember_idx])
1972                .collect();
1973
1974            // Use provided endmember name if available, otherwise use generic "Endmember{i}" format
1975            let column_name = if let Some(names) = endmember_names {
1976                names[endmember_idx].to_string()
1977            } else {
1978                format!("Endmember{}", endmember_idx + 1)
1979            };
1980
1981            let series = Column::new(column_name.into(), values);
1982            columns.push(series);
1983        }
1984
1985        let df = DataFrame::new(n_events, columns)
1986            .map_err(|e| anyhow!("Failed to create DataFrame: {}", e))?;
1987
1988        Ok(Arc::new(df))
1989    }
1990}