flow_fcs/file.rs
1// Internal crate imports
2use crate::{
3 FcsDataType, TransformType, Transformable,
4 byteorder::ByteOrder,
5 header::Header,
6 keyword::{IntegerableKeyword, StringableKeyword},
7 metadata::Metadata,
8 parameter::{EventDataFrame, EventDatum, Parameter, ParameterBuilder, ParameterMap},
9};
10// Standard library imports
11use std::borrow::Cow;
12use std::fs::File;
13use std::ops::Deref;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16
17// External crate imports
18use anyhow::{Context, Result, anyhow};
19use byteorder::{BigEndian as BE, ByteOrder as BO, LittleEndian as LE};
20use itertools::{Itertools, MinMaxResult};
21use memmap3::{Mmap, MmapOptions};
22use faer::Mat;
23use polars::prelude::*;
24use rayon::prelude::*;
25
26/// Threshold for parallel processing: only use parallel for datasets larger than this
27/// Below this threshold, parallel overhead exceeds benefits
28/// Based on benchmarks: 400,000 values (50,000 events × 8 parameters)
29/// - Float32: Always use sequential (benchmarks show sequential is 2-13x faster)
30/// - Int16/Int32/Float64: Use parallel for datasets with ≥400k values
31const PARALLEL_THRESHOLD: usize = 400_000;
32
33/// A shareable wrapper around the file path and memory-map
34///
35/// Uses Arc<Mmap> to share the memory mapping across clones without creating
36/// new file descriptors or memory mappings. This is more efficient than cloning
37/// the underlying file descriptor and re-mapping.
38#[derive(Debug, Clone)]
39pub struct AccessWrapper {
40 /// An owned, mutable path to the file on disk
41 pub path: PathBuf,
42 /// The memory-mapped file, shared via Arc for efficient cloning
43 ///
44 /// # Safety
45 /// The Mmap is created from a File handle and remains valid as long as:
46 /// 1. The file is not truncated while mapped
47 /// 2. The file contents are not modified while mapped (we only read)
48 /// 3. The Mmap is not accessed after the file is deleted
49 ///
50 /// Our usage satisfies these invariants because:
51 /// - FCS files are read-only once opened (we never write back to them)
52 /// - The file remains open (via File handle) for the lifetime of the Mmap
53 /// - We only drop the Mmap when the FCS file is no longer needed
54 pub mmap: Arc<Mmap>,
55}
56
57impl AccessWrapper {
58 /// Creates a new `AccessWrapper` from a file path
59 /// # Errors
60 /// Will return `Err` if:
61 /// - the file cannot be opened
62 /// - the file cannot be memory-mapped
63 pub fn new(path: &str) -> Result<Self> {
64 let file = File::open(path)?;
65 let path = PathBuf::from(path);
66
67 // memmap3 provides better safety guarantees than memmap2, though OS-level
68 // memory mapping still requires unsafe at creation time. The resulting Mmap
69 // is safe to use and provides better guarantees than memmap2.
70 let mmap = unsafe { MmapOptions::new().map(&file)? };
71
72 Ok(Self {
73 path,
74 mmap: Arc::new(mmap),
75 })
76 }
77}
78
79impl Deref for AccessWrapper {
80 type Target = Mmap;
81
82 fn deref(&self) -> &Self::Target {
83 &self.mmap
84 }
85}
86
87/// A struct representing an FCS file
88#[derive(Debug, Clone)]
89pub struct Fcs {
90 /// The header segment of the fcs file, including the version, and byte offsets to the text, data, and analysis segments
91 pub header: Header,
92 /// The metadata segment of the fcs file, including the delimiter, and a hashmap of keyword/value pairs
93 pub metadata: Metadata,
94 /// A hashmap of the parameter names and their associated metadata
95 pub parameters: ParameterMap,
96
97 /// Event data stored in columnar format via Polars DataFrame (NEW)
98 /// Each column represents one parameter (e.g., FSC-A, SSC-A, FL1-A)
99 /// Polars provides:
100 /// - Zero-copy column access
101 /// - Built-in SIMD operations
102 /// - Lazy evaluation for complex queries
103 /// - Apache Arrow interop
104 /// This is the primary data format going forward
105 pub data_frame: EventDataFrame,
106
107 /// A wrapper around the file, path, and memory-map
108 pub file_access: AccessWrapper,
109}
110
111impl Fcs {
112 /// Creates a new Fcs file struct
113 /// # Errors
114 /// Will return `Err` if:
115 /// - the file cannot be opened,
116 /// - the file extension is not `fcs`,
117 /// - the TEXT segment cannot be validated,
118 /// - the raw data cannot be read,
119 /// - the parameter names and labels cannot be generated
120 pub fn new() -> Result<Self> {
121 Ok(Self {
122 header: Header::new(),
123 metadata: Metadata::new(),
124 parameters: ParameterMap::default(),
125 data_frame: Arc::new(DataFrame::empty()),
126 file_access: AccessWrapper::new("")?,
127 })
128 }
129
130 /// Opens and parses an FCS file from the given path
131 ///
132 /// This is the primary entry point for reading FCS files. It:
133 /// - Validates the file extension (must be `.fcs`)
134 /// - Memory-maps the file for efficient access
135 /// - Parses the header segment to determine FCS version and segment offsets
136 /// - Parses the text segment to extract metadata and keywords
137 /// - Validates required keywords for the FCS version
138 /// - Generates a GUID if one is not present
139 /// - Loads event data into a Polars DataFrame for efficient columnar access
140 ///
141 /// # Arguments
142 /// * `path` - Path to the FCS file (must have `.fcs` extension)
143 ///
144 /// # Errors
145 /// Will return `Err` if:
146 /// - the file cannot be opened or memory-mapped
147 /// - the file extension is not `.fcs`
148 /// - the FCS version is invalid or unsupported
149 /// - required keywords are missing for the FCS version
150 /// - the data segment cannot be read or parsed
151 /// - parameter metadata cannot be generated
152 ///
153 /// # Example
154 /// ```no_run
155 /// use flow_fcs::Fcs;
156 ///
157 /// let fcs = Fcs::open("data/sample.fcs")?;
158 /// println!("File has {} events", fcs.get_number_of_events()?);
159 /// # Ok::<(), anyhow::Error>(())
160 /// ```
161 pub fn open(path: &str) -> Result<Self> {
162 use tracing::debug;
163
164 // Attempt to open the file path
165 let file_access =
166 AccessWrapper::new(path).with_context(|| format!("Failed to open file: {}", path))?;
167
168 // Validate the file extension
169 Self::validate_fcs_extension(&file_access.path).with_context(|| {
170 format!("Invalid file extension for: {}", file_access.path.display())
171 })?;
172
173 // Create header and metadata structs from a memory map of the file
174 let header = Header::from_mmap(&file_access.mmap).with_context(|| {
175 format!(
176 "Failed to parse header from file: {}",
177 file_access.path.display()
178 )
179 })?;
180 let mut metadata = Metadata::from_mmap(&file_access.mmap, &header);
181
182 metadata
183 .validate_text_segment_keywords(&header)
184 .with_context(|| {
185 format!(
186 "Failed to validate text segment keywords in file: {}",
187 file_access.path.display()
188 )
189 })?;
190 // metadata.validate_number_of_parameters()?;
191 metadata.validate_guid();
192
193 // Log $TOT keyword value
194 let tot_events = metadata.get_number_of_events().ok().copied();
195 if let Some(tot) = tot_events {
196 debug!("FCS file $TOT keyword: {} events", tot);
197 }
198
199 let parameters = Self::generate_parameter_map(&metadata).map_err(|e| {
200 let diagnostic = Self::format_diagnostic_info(&header, &metadata, &file_access.path);
201 anyhow!("Failed to generate parameter map: {}\n\n{}", e, diagnostic)
202 })?;
203
204 let data_frame = Self::store_raw_data_as_dataframe(&header, &file_access.mmap, &metadata)
205 .map_err(|e| {
206 let diagnostic = Self::format_diagnostic_info(&header, &metadata, &file_access.path);
207 anyhow!(
208 "Failed to store raw data as DataFrame: {}\n\n{}",
209 e,
210 diagnostic
211 )
212 })?;
213
214 let fcs = Self {
215 parameters,
216 data_frame,
217 file_access,
218 header,
219 metadata,
220 };
221
222 // Log DataFrame event count and compare to $TOT
223 let df_events = fcs.get_event_count_from_dataframe();
224 if let Some(tot) = tot_events {
225 if df_events != tot {
226 tracing::warn!(
227 "Event count mismatch: DataFrame has {} events but $TOT keyword says {} (difference: {})",
228 df_events,
229 tot,
230 tot as i64 - df_events as i64
231 );
232 } else {
233 debug!("Event count matches $TOT keyword: {} events", df_events);
234 }
235 }
236
237 // Log compensation status
238 let has_compensation = fcs.has_compensation();
239 debug!(
240 "Compensation: {} (SPILLOVER keyword {})",
241 if has_compensation {
242 "available"
243 } else {
244 "not available"
245 },
246 if has_compensation {
247 "present"
248 } else {
249 "missing"
250 }
251 );
252
253 // Log parameter count
254 let n_params = fcs.parameters.len();
255 debug!(
256 "FCS file loaded: {} parameters, {} events",
257 n_params, df_events
258 );
259
260 Ok(fcs)
261 }
262
263 /// Validates that the file extension is `.fcs`
264 /// # Errors
265 /// Will return `Err` if the file extension is not `.fcs`
266 fn validate_fcs_extension(path: &Path) -> Result<()> {
267 let extension = path
268 .extension()
269 .ok_or_else(|| anyhow!("File has no extension"))?
270 .to_str()
271 .ok_or_else(|| anyhow!("File extension is not valid UTF-8"))?;
272
273 if extension.to_lowercase() != "fcs" {
274 return Err(anyhow!("Invalid file extension: {}", extension));
275 }
276
277 Ok(())
278 }
279
280 /// Reads raw data from FCS file and stores it as a Polars DataFrame
281 /// Returns columnar data optimized for parameter-wise access patterns
282 ///
283 /// This function provides significant performance benefits over row-based array storage:
284 /// - 2-5x faster data filtering and transformations
285 /// - Native columnar storage (optimal for FCS parameter access patterns)
286 /// - Zero-copy operations via Apache Arrow
287 /// - Built-in SIMD acceleration
288 ///
289 /// # Errors
290 /// Will return `Err` if:
291 /// - The data cannot be read
292 /// - The data cannot be converted to f32 values
293 /// - The DataFrame cannot be constructed
294 fn store_raw_data_as_dataframe(
295 header: &Header,
296 mmap: &Mmap,
297 metadata: &Metadata,
298 ) -> Result<EventDataFrame> {
299 // Validate data offset bounds before accessing mmap
300 let mut data_start = *header.data_offset.start();
301 let mut data_end = *header.data_offset.end();
302 let mmap_len = mmap.len();
303
304 // Handle zero offsets by checking keywords
305 if data_start == 0 {
306 if let Ok(begin_data) = metadata.get_integer_keyword("$BEGINDATA") {
307 data_start = begin_data.get_usize().clone();
308 } else {
309 return Err(anyhow!(
310 "$BEGINDATA keyword not found. Unable to determine data start."
311 ));
312 }
313 }
314
315 if data_end == 0 {
316 if let Ok(end_data) = metadata.get_integer_keyword("$ENDDATA") {
317 data_end = end_data.get_usize().clone();
318 } else {
319 return Err(anyhow!(
320 "$ENDDATA keyword not found. Unable to determine data end."
321 ));
322 }
323 }
324
325 // Validate offsets
326 if data_start >= mmap_len {
327 return Err(anyhow!(
328 "Data start offset {} is beyond mmap length {}",
329 data_start,
330 mmap_len
331 ));
332 }
333
334 if data_end >= mmap_len {
335 return Err(anyhow!(
336 "Data end offset {} is beyond mmap length {}",
337 data_end,
338 mmap_len
339 ));
340 }
341
342 if data_start > data_end {
343 return Err(anyhow!(
344 "Data start offset {} is greater than end offset {}",
345 data_start,
346 data_end
347 ));
348 }
349
350 // Extract data bytes
351 let data_bytes = &mmap[data_start..=data_end];
352
353 let number_of_parameters = metadata
354 .get_number_of_parameters()
355 .context("Failed to retrieve the number of parameters from metadata")?;
356 let number_of_events = metadata
357 .get_number_of_events()
358 .context("Failed to retrieve the number of events from metadata")?;
359
360 // Calculate bytes per event by summing $PnB / 8 for each parameter
361 // This is more accurate than using $DATATYPE which only provides a default
362 let bytes_per_event = metadata
363 .calculate_bytes_per_event()
364 .context("Failed to calculate bytes per event")?;
365
366 let byte_order = metadata
367 .get_byte_order()
368 .context("Failed to get the byte order from metadata")?;
369
370 // Validate data size
371 let expected_total_bytes = number_of_events * bytes_per_event;
372 if data_bytes.len() < expected_total_bytes {
373 return Err(anyhow!(
374 "Insufficient data: expected {} bytes ({} events × {} bytes/event), but only have {} bytes",
375 expected_total_bytes,
376 number_of_events,
377 bytes_per_event,
378 data_bytes.len()
379 ));
380 }
381
382 // Collect bytes per parameter and data types for each parameter
383 let bytes_per_parameter: Vec<usize> = (1..=*number_of_parameters)
384 .map(|param_num| {
385 metadata
386 .get_bytes_per_parameter(param_num)
387 .with_context(|| {
388 format!(
389 "Failed to get bytes per parameter for parameter {}",
390 param_num
391 )
392 })
393 })
394 .collect::<Result<Vec<_>>>()?;
395
396 let data_types: Vec<FcsDataType> = (1..=*number_of_parameters)
397 .map(|param_num| {
398 metadata
399 .get_data_type_for_channel(param_num)
400 .with_context(|| format!("Failed to get data type for channel {}", param_num))
401 })
402 .collect::<Result<Vec<_>>>()?;
403
404 // Fast path: Check if all parameters are uniform (same bytes, same data type)
405 // This allows us to use bytemuck zero-copy optimization
406 let uniform_bytes = bytes_per_parameter.first().copied();
407 let uniform_data_type = data_types.first().copied();
408 let is_uniform = match (uniform_bytes, uniform_data_type) {
409 (Some(bytes), Some(dt)) => {
410 bytes_per_parameter.iter().all(|&b| b == bytes)
411 && data_types.iter().all(|&dt_val| dt_val == dt)
412 }
413 _ => false,
414 };
415
416 let f32_values: Vec<f32> = if is_uniform {
417 // Fast path: All parameters have same size and type - use bytemuck for zero-copy
418 let bytes_per_param = uniform_bytes.ok_or_else(|| anyhow!("No uniform bytes found"))?;
419 let data_type =
420 uniform_data_type.ok_or_else(|| anyhow!("No uniform data type found"))?;
421
422 match (data_type, bytes_per_param) {
423 (FcsDataType::F, 4) => {
424 // Fast path: float32 - use sequential (benchmarks show 2.57x faster than parallel)
425 let needs_swap = match (byte_order, cfg!(target_endian = "little")) {
426 (ByteOrder::LittleEndian, true) | (ByteOrder::BigEndian, false) => false,
427 _ => true,
428 };
429
430 match bytemuck::try_cast_slice::<u8, f32>(data_bytes) {
431 Ok(f32_slice) => {
432 #[cfg(debug_assertions)]
433 eprintln!(
434 "✓ Fast path (bytemuck zero-copy, sequential): {} bytes, {} f32s",
435 data_bytes.len(),
436 f32_slice.len()
437 );
438
439 if needs_swap {
440 // Sequential byte swap - faster than parallel for float32
441 f32_slice
442 .iter()
443 .map(|&f| f32::from_bits(f.to_bits().swap_bytes()))
444 .collect()
445 } else {
446 f32_slice.to_vec()
447 }
448 }
449 Err(_) => {
450 #[cfg(debug_assertions)]
451 eprintln!(
452 "⚠ Fast path (bytemuck fallback, sequential): unaligned data ({} bytes)",
453 data_bytes.len()
454 );
455
456 // Fallback: parse in chunks sequentially (faster than parallel for float32)
457 data_bytes
458 .chunks_exact(4)
459 .map(|chunk| {
460 let mut bytes = [0u8; 4];
461 bytes.copy_from_slice(chunk);
462 let bits = u32::from_ne_bytes(bytes);
463 let bits = if needs_swap { bits.swap_bytes() } else { bits };
464 f32::from_bits(bits)
465 })
466 .collect()
467 }
468 }
469 }
470 _ => {
471 // Uniform but not float32 - use optimized bulk parsing
472 Self::parse_uniform_data_bulk(
473 data_bytes,
474 bytes_per_param,
475 &data_type,
476 byte_order,
477 *number_of_events,
478 *number_of_parameters,
479 )?
480 }
481 }
482 } else {
483 // Slow path: Variable-width parameters - parse event-by-event
484 Self::parse_variable_width_data(
485 data_bytes,
486 &bytes_per_parameter,
487 &data_types,
488 byte_order,
489 *number_of_events,
490 *number_of_parameters,
491 )?
492 };
493
494 // Create Polars Series for each parameter (column)
495 // FCS data is stored row-wise (event1_param1, event1_param2, ..., event2_param1, ...)
496 // We need to extract columns using stride access
497 let mut columns: Vec<Column> = Vec::with_capacity(*number_of_parameters);
498
499 for param_idx in 0..*number_of_parameters {
500 // Extract this parameter's values across all events
501 // Use iterator with step_by for efficient stride access
502 let param_values: Vec<f32> = f32_values
503 .iter()
504 .skip(param_idx)
505 .step_by(*number_of_parameters)
506 .copied()
507 .collect();
508
509 // Verify we got the right number of events
510 assert_eq!(
511 param_values.len(),
512 *number_of_events,
513 "Parameter {} should have {} events, got {}",
514 param_idx + 1,
515 number_of_events,
516 param_values.len()
517 );
518
519 // Get parameter name from metadata for column name
520 let param_name = metadata
521 .get_parameter_channel_name(param_idx + 1)
522 .map(|s| s.to_string())
523 .unwrap_or_else(|_| format!("P{}", param_idx + 1));
524
525 // Create Series (Polars column) with name
526 let series = Column::new(param_name.as_str().into(), param_values);
527 columns.push(series);
528 }
529
530 // Create DataFrame from columns (height = number of events)
531 let df = DataFrame::new(*number_of_events, columns).map_err(|e| {
532 anyhow!(
533 "Failed to create DataFrame from {} columns: {}",
534 number_of_parameters,
535 e
536 )
537 })?;
538
539 // Verify DataFrame shape
540 assert_eq!(
541 df.height(),
542 *number_of_events,
543 "DataFrame height {} doesn't match expected events {}",
544 df.height(),
545 number_of_events
546 );
547 assert_eq!(
548 df.width(),
549 *number_of_parameters,
550 "DataFrame width {} doesn't match expected parameters {}",
551 df.width(),
552 number_of_parameters
553 );
554
555 #[cfg(debug_assertions)]
556 eprintln!(
557 "✓ Created DataFrame: {} events × {} parameters",
558 df.height(),
559 df.width()
560 );
561
562 Ok(Arc::new(df))
563 }
564
565 /// Parse uniform data in bulk (all parameters have same size and type)
566 ///
567 /// This is faster than event-by-event parsing when all parameters are uniform.
568 /// Uses conditional parallelization based on data type and size:
569 /// - float32: always sequential (benchmarks show 2.57x faster)
570 /// - int16/int32: parallel only above threshold (parallel is 1.84x faster for large datasets)
571 /// - float64: parallel only above threshold
572 ///
573 /// # Arguments
574 /// * `data_bytes` - Raw data bytes
575 /// * `bytes_per_param` - Bytes per parameter (same for all)
576 /// * `data_type` - Data type (same for all)
577 /// * `byte_order` - Byte order
578 /// * `num_events` - Number of events
579 /// * `num_params` - Number of parameters
580 ///
581 /// # Errors
582 /// Will return `Err` if parsing fails
583 #[inline]
584 fn parse_uniform_data_bulk(
585 data_bytes: &[u8],
586 bytes_per_param: usize,
587 data_type: &FcsDataType,
588 byte_order: &ByteOrder,
589 num_events: usize,
590 num_params: usize,
591 ) -> Result<Vec<f32>> {
592 let total_values = num_events * num_params;
593 let use_parallel = total_values > PARALLEL_THRESHOLD;
594 let mut f32_values = Vec::with_capacity(total_values);
595
596 match (data_type, bytes_per_param) {
597 (FcsDataType::I, 2) => {
598 // int16 - parallel is 1.84x faster for large datasets
599 if use_parallel {
600 data_bytes
601 .par_chunks_exact(2)
602 .map(|chunk| {
603 let value = match byte_order {
604 ByteOrder::LittleEndian => LE::read_u16(chunk),
605 ByteOrder::BigEndian => BE::read_u16(chunk),
606 };
607 value as f32
608 })
609 .collect_into_vec(&mut f32_values);
610 } else {
611 // Sequential for small datasets
612 f32_values = data_bytes
613 .chunks_exact(2)
614 .map(|chunk| {
615 let value = match byte_order {
616 ByteOrder::LittleEndian => LE::read_u16(chunk),
617 ByteOrder::BigEndian => BE::read_u16(chunk),
618 };
619 value as f32
620 })
621 .collect();
622 }
623 }
624 (FcsDataType::I, 4) => {
625 // int32 - parallel only above threshold
626 if use_parallel {
627 data_bytes
628 .par_chunks_exact(4)
629 .map(|chunk| {
630 let value = match byte_order {
631 ByteOrder::LittleEndian => LE::read_u32(chunk),
632 ByteOrder::BigEndian => BE::read_u32(chunk),
633 };
634 value as f32
635 })
636 .collect_into_vec(&mut f32_values);
637 } else {
638 // Sequential for small datasets
639 f32_values = data_bytes
640 .chunks_exact(4)
641 .map(|chunk| {
642 let value = match byte_order {
643 ByteOrder::LittleEndian => LE::read_u32(chunk),
644 ByteOrder::BigEndian => BE::read_u32(chunk),
645 };
646 value as f32
647 })
648 .collect();
649 }
650 }
651 (FcsDataType::F, 4) => {
652 // float32 - always sequential (benchmarks show 2.57x faster than parallel)
653 // This is a fallback path - normally handled by bytemuck in store_raw_data_as_dataframe
654 let needs_swap = match (byte_order, cfg!(target_endian = "little")) {
655 (ByteOrder::LittleEndian, true) | (ByteOrder::BigEndian, false) => false,
656 _ => true,
657 };
658 f32_values = data_bytes
659 .chunks_exact(4)
660 .map(|chunk| {
661 let mut bytes = [0u8; 4];
662 bytes.copy_from_slice(chunk);
663 let bits = u32::from_ne_bytes(bytes);
664 let bits = if needs_swap { bits.swap_bytes() } else { bits };
665 f32::from_bits(bits)
666 })
667 .collect();
668 }
669 (FcsDataType::D, 8) => {
670 // float64 - parallel only above threshold
671 if use_parallel {
672 data_bytes
673 .par_chunks_exact(8)
674 .map(|chunk| {
675 let value = match byte_order {
676 ByteOrder::LittleEndian => LE::read_f64(chunk),
677 ByteOrder::BigEndian => BE::read_f64(chunk),
678 };
679 value as f32
680 })
681 .collect_into_vec(&mut f32_values);
682 } else {
683 // Sequential for small datasets
684 f32_values = data_bytes
685 .chunks_exact(8)
686 .map(|chunk| {
687 let value = match byte_order {
688 ByteOrder::LittleEndian => LE::read_f64(chunk),
689 ByteOrder::BigEndian => BE::read_f64(chunk),
690 };
691 value as f32
692 })
693 .collect();
694 }
695 }
696 _ => {
697 return Err(anyhow!(
698 "Unsupported uniform data type: {:?} with {} bytes",
699 data_type,
700 bytes_per_param
701 ));
702 }
703 }
704
705 Ok(f32_values)
706 }
707
708 /// Parse a parameter value from bytes to f32 based on data type and bytes per parameter
709 ///
710 /// Handles different data types:
711 /// - int16 (2 bytes) - unsigned integer
712 /// - int32 (4 bytes) - unsigned integer
713 /// - float32 (4 bytes) - single-precision floating point
714 /// - float64 (8 bytes) - double-precision floating point
715 ///
716 /// # Arguments
717 /// * `bytes` - Raw bytes for the parameter value
718 /// * `bytes_per_param` - Number of bytes per parameter (from $PnB / 8)
719 /// * `data_type` - Data type (I, F, or D)
720 /// * `byte_order` - Byte order of the file
721 ///
722 /// # Errors
723 /// Will return `Err` if the bytes cannot be parsed according to the data type
724 #[cold]
725 fn parse_parameter_value_to_f32(
726 bytes: &[u8],
727 bytes_per_param: usize,
728 data_type: &FcsDataType,
729 byte_order: &ByteOrder,
730 ) -> Result<f32> {
731 match (data_type, bytes_per_param) {
732 (FcsDataType::I, 2) => {
733 // int16 (unsigned 16-bit integer)
734 let value = match byte_order {
735 ByteOrder::LittleEndian => LE::read_u16(bytes),
736 ByteOrder::BigEndian => BE::read_u16(bytes),
737 };
738 Ok(value as f32)
739 }
740 (FcsDataType::I, 4) => {
741 // int32 (unsigned 32-bit integer)
742 let value = match byte_order {
743 ByteOrder::LittleEndian => LE::read_u32(bytes),
744 ByteOrder::BigEndian => BE::read_u32(bytes),
745 };
746 Ok(value as f32)
747 }
748 (FcsDataType::F, 4) => {
749 // float32 (single-precision floating point)
750 Ok(byte_order.read_f32(bytes))
751 }
752 (FcsDataType::D, 8) => {
753 // float64 (double-precision floating point) - convert to f32
754 let value = match byte_order {
755 ByteOrder::LittleEndian => LE::read_f64(bytes),
756 ByteOrder::BigEndian => BE::read_f64(bytes),
757 };
758 Ok(value as f32)
759 }
760 (FcsDataType::I, _) => Err(anyhow!(
761 "Unsupported integer size: {} bytes (expected 2 or 4)",
762 bytes_per_param
763 )),
764 (FcsDataType::F, _) => Err(anyhow!(
765 "Invalid float32 size: {} bytes (expected 4)",
766 bytes_per_param
767 )),
768 (FcsDataType::D, _) => Err(anyhow!(
769 "Invalid float64 size: {} bytes (expected 8)",
770 bytes_per_param
771 )),
772 (FcsDataType::A, _) => Err(anyhow!("ASCII data type not supported")),
773 }
774 }
775
776 /// Parse variable-width data event-by-event (cold path)
777 ///
778 /// This is the slower path used when parameters have different sizes/types.
779 /// Marked as `#[cold]` to help the compiler optimize the hot path.
780 ///
781 /// # Arguments
782 /// * `data_bytes` - Raw data bytes
783 /// * `bytes_per_parameter` - Bytes per parameter for each parameter
784 /// * `data_types` - Data type for each parameter
785 /// * `byte_order` - Byte order
786 /// * `num_events` - Number of events
787 /// * `num_params` - Number of parameters
788 ///
789 /// # Errors
790 /// Will return `Err` if parsing fails
791 #[cold]
792 fn parse_variable_width_data(
793 data_bytes: &[u8],
794 bytes_per_parameter: &[usize],
795 data_types: &[FcsDataType],
796 byte_order: &ByteOrder,
797 num_events: usize,
798 num_params: usize,
799 ) -> Result<Vec<f32>> {
800 let mut f32_values: Vec<f32> = Vec::with_capacity(num_events * num_params);
801 let mut data_offset = 0;
802
803 for event_idx in 0..num_events {
804 for (param_idx, &bytes_per_param) in bytes_per_parameter.iter().enumerate() {
805 let param_num = param_idx + 1;
806 let data_type = &data_types[param_idx];
807
808 // Extract bytes for this parameter value
809 if data_offset + bytes_per_param > data_bytes.len() {
810 return Err(anyhow!(
811 "Insufficient data at event {}, parameter {}: need {} bytes but only have {} remaining",
812 event_idx + 1,
813 param_num,
814 bytes_per_param,
815 data_bytes.len() - data_offset
816 ));
817 }
818
819 let param_bytes = &data_bytes[data_offset..data_offset + bytes_per_param];
820 let f32_value = Self::parse_parameter_value_to_f32(
821 param_bytes,
822 bytes_per_param,
823 data_type,
824 byte_order,
825 )
826 .map_err(|e| anyhow!("Failed to parse parameter {} value: {}", param_num, e))?;
827
828 f32_values.push(f32_value);
829 data_offset += bytes_per_param;
830 }
831 }
832
833 Ok(f32_values)
834 }
835
836 /// Looks for the parameter name as a key in the `parameters` hashmap and returns a reference to it
837 /// Performs case-insensitive lookup for parameter names
838 /// # Errors
839 /// Will return `Err` if the parameter name is not found in the `parameters` hashmap
840 pub fn find_parameter(&self, parameter_name: &str) -> Result<&Parameter> {
841 // Try exact match first (fast path)
842 if let Some(param) = self.parameters.get(parameter_name) {
843 return Ok(param);
844 }
845
846 // Case-insensitive fallback: search through parameter map
847 for (key, param) in self.parameters.iter() {
848 if key.eq_ignore_ascii_case(parameter_name) {
849 return Ok(param);
850 }
851 }
852
853 Err(anyhow!("Parameter not found: {parameter_name}"))
854 }
855
856 /// Looks for the parameter name as a key in the `parameters` hashmap and returns a mutable reference to it
857 /// Performs case-insensitive lookup for parameter names
858 /// # Errors
859 /// Will return `Err` if the parameter name is not found in the `parameters` hashmap
860 pub fn find_mutable_parameter(&mut self, parameter_name: &str) -> Result<&mut Parameter> {
861 // Try exact match first (fast path)
862 // Note: We need to check if the key exists as Arc<str>, so we iterate to find exact match
863 let exact_key = self
864 .parameters
865 .keys()
866 .find(|k| k.as_ref() == parameter_name)
867 .map(|k| k.clone());
868
869 if let Some(key) = exact_key {
870 return self
871 .parameters
872 .get_mut(&key)
873 .ok_or_else(|| anyhow!("Parameter not found: {parameter_name}"));
874 }
875
876 // Case-insensitive fallback: find the key first (clone Arc to avoid borrow issues)
877 let matching_key = self
878 .parameters
879 .keys()
880 .find(|key| key.eq_ignore_ascii_case(parameter_name))
881 .map(|k| k.clone());
882
883 if let Some(key) = matching_key {
884 return self
885 .parameters
886 .get_mut(&key)
887 .ok_or_else(|| anyhow!("Parameter not found: {parameter_name}"));
888 }
889
890 Err(anyhow!("Parameter not found: {parameter_name}"))
891 }
892
893 /// Returns a zero-copy reference to a Polars Float32Chunked view of a column for the parameter
894 ///
895 /// This provides access to the underlying Polars chunked array, which is useful
896 /// for operations that work directly with Polars types. For most use cases,
897 /// `get_parameter_events_slice()` is preferred as it provides a simple `&[f32]` slice.
898 ///
899 /// # Arguments
900 /// * `channel_name` - The channel name (e.g., "FSC-A", "FL1-A")
901 ///
902 /// # Errors
903 /// Will return `Err` if:
904 /// - the parameter name is not found in the parameters map
905 /// - the column data type is not Float32
906 pub fn get_parameter_events(&'_ self, channel_name: &str) -> Result<&Float32Chunked> {
907 Ok(self
908 .get_parameter_column(channel_name)?
909 .f32()
910 .map_err(|e| anyhow!("Parameter {} is not f32 type: {}", channel_name, e))?)
911 }
912 /// Get a reference to the Polars Column for a parameter by channel name
913 ///
914 /// This provides direct access to the underlying Polars column, which can be useful
915 /// for advanced operations that require the full Polars API.
916 ///
917 /// # Arguments
918 /// * `channel_name` - The channel name (e.g., "FSC-A", "FL1-A")
919 ///
920 /// # Errors
921 /// Will return `Err` if the parameter name is not found in the DataFrame
922 pub fn get_parameter_column(&'_ self, channel_name: &str) -> Result<&Column> {
923 self.data_frame
924 .column(channel_name)
925 .map_err(|e| anyhow!("Parameter {} not found: {}", channel_name, e))
926 }
927
928 /// Looks for the parameter name as a key in the 'parameters' hashmap and returns a new Vec<f32> of the raw event data
929 /// NOTE: This allocates a full copy of the events - prefer `get_parameter_events_slice` when possible
930 /// # Errors
931 /// Will return 'Err' if the parameter name is not found in the 'parameters hashmap or if the events are not found
932 pub fn get_parameter_events_as_owned_vec(&self, channel_name: &str) -> Result<Vec<EventDatum>> {
933 Ok(self.get_parameter_events_slice(channel_name)?.to_vec())
934 }
935
936 /// Returns the minimum and maximum values of the parameter
937 /// # Errors
938 /// Will return `Err` if the parameter name is not found in the 'parameters' hashmap or if the events are not found
939 pub fn get_minmax_of_parameter(&self, channel_name: &str) -> Result<(EventDatum, EventDatum)> {
940 let parameter = self.find_parameter(channel_name)?;
941 let events = self.get_parameter_events(¶meter.channel_name)?;
942
943 match events.iter().minmax() {
944 MinMaxResult::NoElements => Err(anyhow!("No elements found")),
945 MinMaxResult::OneElement(e) => Err(anyhow!("Only one element found: {:?}", e)),
946 MinMaxResult::MinMax(min, max) => {
947 let min_val = min.ok_or_else(|| anyhow!("Min value is None"))?;
948 let max_val = max.ok_or_else(|| anyhow!("Max value is None"))?;
949 Ok((min_val, max_val))
950 }
951 }
952 }
953
954 /// Creates a new `HashMap` of `Parameter`s
955 /// using the `Fcs` file's metadata to find the channel and label names from the `PnN` and `PnS` keywords.
956 /// Does NOT store events on the parameter.
957 /// # Errors
958 /// Will return `Err` if:
959 /// - the number of parameters cannot be found in the metadata,
960 /// - the parameter name cannot be found in the metadata,
961 /// - the parameter cannot be built (using the Builder pattern)
962 /// Format diagnostic information about FCS file state for error messages
963 fn format_diagnostic_info(header: &Header, metadata: &Metadata, path: &Path) -> String {
964 let mut lines = Vec::new();
965
966 lines.push("=== FCS File Diagnostic Information ===".to_string());
967 lines.push(format!("File path: {}", path.display()));
968 lines.push(format!("FCS Version: {}", header.version));
969 lines.push(format!(
970 "Text segment: {}..={}",
971 header.text_offset.start(),
972 header.text_offset.end()
973 ));
974 lines.push(format!(
975 "Data segment: {}..={}",
976 header.data_offset.start(),
977 header.data_offset.end()
978 ));
979 lines.push(format!(
980 "Analysis segment: {}..={}",
981 header.analysis_offset.start(),
982 header.analysis_offset.end()
983 ));
984 lines.push(format!(
985 "Delimiter: '{}' (0x{:02x})",
986 metadata.delimiter, metadata.delimiter as u8
987 ));
988
989 // Get number of parameters
990 let n_params = metadata
991 .get_number_of_parameters()
992 .map(|n| format!("{}", n))
993 .unwrap_or_else(|e| format!("Error: {}", e));
994 lines.push(format!("Number of parameters ($PAR): {}", n_params));
995
996 // Get number of events
997 let n_events = metadata
998 .get_number_of_events()
999 .map(|n| format!("{}", n))
1000 .unwrap_or_else(|e| format!("Error: {}", e));
1001 lines.push(format!("Number of events ($TOT): {}", n_events));
1002
1003 // List all parameter keywords
1004 lines.push("\n=== Parameter Keywords ===".to_string());
1005 let number_of_parameters = metadata.get_number_of_parameters().ok().copied();
1006 if let Some(n_params) = number_of_parameters {
1007 for param_num in 1..=n_params {
1008 let pn_keywords = [
1009 format!("$P{}N", param_num),
1010 format!("$P{}S", param_num),
1011 format!("$P{}B", param_num),
1012 format!("$P{}E", param_num),
1013 format!("$P{}R", param_num),
1014 ];
1015
1016 let mut found_keywords = Vec::new();
1017 for key in &pn_keywords {
1018 if let Some(kw) = metadata.keywords.get(key) {
1019 let value = match kw {
1020 crate::keyword::Keyword::String(sk) => sk.get_str().to_string(),
1021 crate::keyword::Keyword::Int(ik) => ik.get_usize().to_string(),
1022 crate::keyword::Keyword::Float(fk) => fk.to_string(),
1023 crate::keyword::Keyword::Byte(bk) => bk.get_str().to_string(),
1024 crate::keyword::Keyword::Mixed(mk) => mk.to_string(),
1025 };
1026 found_keywords.push(format!(" {} = {}", key, value));
1027 } else {
1028 found_keywords.push(format!(" {} = <MISSING>", key));
1029 }
1030 }
1031 lines.push(format!("Parameter {}:", param_num));
1032 lines.extend(found_keywords);
1033 }
1034 } else {
1035 lines.push(" Could not determine number of parameters".to_string());
1036 }
1037
1038 // List all keywords sorted
1039 lines.push("\n=== All Keywords (sorted) ===".to_string());
1040 let mut sorted_keys: Vec<_> = metadata.keywords.keys().collect();
1041 sorted_keys.sort();
1042 for key in sorted_keys {
1043 let kw = match metadata.keywords.get(key) {
1044 Some(kw) => kw,
1045 None => continue, // Skip missing keywords in diagnostic output
1046 };
1047 let value = match kw {
1048 crate::keyword::Keyword::String(sk) => sk.get_str().to_string(),
1049 crate::keyword::Keyword::Int(ik) => ik.get_usize().to_string(),
1050 crate::keyword::Keyword::Float(fk) => fk.to_string(),
1051 crate::keyword::Keyword::Byte(bk) => bk.get_str().to_string(),
1052 crate::keyword::Keyword::Mixed(mk) => mk.to_string(),
1053 };
1054 lines.push(format!(" {} = {}", key, value));
1055 }
1056
1057 lines.join("\n")
1058 }
1059
1060 pub fn generate_parameter_map(metadata: &Metadata) -> Result<ParameterMap> {
1061 let mut map = ParameterMap::default();
1062 let number_of_parameters = metadata.get_number_of_parameters()?;
1063 for parameter_number in 1..=*number_of_parameters {
1064 let channel_name = metadata.get_parameter_channel_name(parameter_number)
1065 .map_err(|e| anyhow!(
1066 "Failed to get channel name for parameter {}: {}\n\nHint: Check that $P{}N keyword exists in metadata.",
1067 parameter_number,
1068 e,
1069 parameter_number
1070 ))?;
1071
1072 // Use label name or fallback to the parameter name
1073 let label_name = match metadata.get_parameter_label(parameter_number) {
1074 Ok(label) => label,
1075 Err(_) => channel_name,
1076 };
1077
1078 let transform = if channel_name.contains("FSC")
1079 || channel_name.contains("SSC")
1080 || channel_name.contains("Time")
1081 {
1082 TransformType::Linear
1083 } else {
1084 TransformType::default()
1085 };
1086
1087 // Get excitation wavelength from metadata if available
1088 let excitation_wavelength = metadata
1089 .get_parameter_excitation_wavelength(parameter_number)
1090 .ok()
1091 .flatten();
1092
1093 let parameter = ParameterBuilder::default()
1094 // For the ParameterBuilder, ensure we're using the proper methods
1095 // that may be defined by the Builder derive macro
1096 .parameter_number(parameter_number)
1097 .channel_name(channel_name)
1098 .label_name(label_name)
1099 .transform(transform)
1100 .excitation_wavelength(excitation_wavelength)
1101 .build()?;
1102
1103 // Add the parameter events to the hashmap keyed by the parameter name
1104 map.insert(channel_name.to_string().into(), parameter);
1105 }
1106
1107 Ok(map)
1108 }
1109
1110 /// Looks for a keyword among the metadata and returns its value as a `&str`
1111 /// # Errors
1112 /// Will return `Err` if the `Keyword` is not found in the `metadata` or if the `Keyword` cannot be converted to a `&str`
1113 pub fn get_keyword_string_value(&self, keyword: &str) -> Result<Cow<'_, str>> {
1114 // TODO: This should be a match statement
1115 if let Ok(keyword) = self.metadata.get_string_keyword(keyword) {
1116 Ok(keyword.get_str())
1117 } else if let Ok(keyword) = self.metadata.get_integer_keyword(keyword) {
1118 Ok(keyword.get_str())
1119 } else if let Ok(keyword) = self.metadata.get_float_keyword(keyword) {
1120 Ok(keyword.get_str())
1121 } else if let Ok(keyword) = self.metadata.get_byte_keyword(keyword) {
1122 Ok(keyword.get_str())
1123 } else if let Ok(keyword) = self.metadata.get_mixed_keyword(keyword) {
1124 Ok(keyword.get_str())
1125 } else {
1126 Err(anyhow!("Keyword not found: {}", keyword))
1127 }
1128 }
1129 /// A convenience function to return the `GUID` keyword from the `metadata` as a `&str`
1130 /// # Errors
1131 /// Will return `Err` if the `GUID` keyword is not found in the `metadata` or if the `GUID` keyword cannot be converted to a `&str`
1132 pub fn get_guid(&self) -> Result<Cow<'_, str>> {
1133 Ok(self.metadata.get_string_keyword("GUID")?.get_str())
1134 }
1135
1136 /// Set or update the GUID keyword in the file's metadata
1137 pub fn set_guid(&mut self, guid: String) {
1138 self.metadata
1139 .insert_string_keyword("GUID".to_string(), guid);
1140 }
1141
1142 /// A convenience function to return the `$FIL` keyword from the `metadata` as a `&str`
1143 /// # Errors
1144 /// Will return `Err` if the `$FIL` keyword is not found in the `metadata` or if the `$FIL` keyword cannot be converted to a `&str`
1145 pub fn get_fil_keyword(&self) -> Result<Cow<'_, str>> {
1146 Ok(self.metadata.get_string_keyword("$FIL")?.get_str())
1147 }
1148
1149 /// A convenience function to return the `$TOT` keyword from the `metadata` as a `usize`
1150 /// # Errors
1151 /// Will return `Err` if the `$TOT` keyword is not found in the `metadata` or if the `$TOT` keyword cannot be converted to a `usize`
1152 pub fn get_number_of_events(&self) -> Result<&usize> {
1153 self.metadata.get_number_of_events()
1154 }
1155
1156 /// A convenience function to return the `$PAR` keyword from the `metadata` as a `usize`
1157 /// # Errors
1158 /// Will return `Err` if the `$PAR` keyword is not found in the `metadata` or if the `$PAR` keyword cannot be converted to a `usize`
1159 pub fn get_number_of_parameters(&self) -> Result<&usize> {
1160 self.metadata.get_number_of_parameters()
1161 }
1162
1163 // ==================== NEW POLARS-BASED ACCESSOR METHODS ====================
1164
1165 /// Get events for a parameter as a slice of f32 values
1166 /// Polars gives us direct access to the underlying buffer (zero-copy)
1167 /// # Errors
1168 /// Will return `Err` if:
1169 /// - the parameter name is not found
1170 /// - the Series data type is not Float32
1171 /// - the data is chunked (rare for FCS files)
1172 pub fn get_parameter_events_slice(&self, channel_name: &str) -> Result<&[f32]> {
1173 self.get_parameter_events(channel_name)?
1174 .cont_slice()
1175 .map_err(|e| anyhow!("Parameter {} data is not contiguous: {}", channel_name, e))
1176 }
1177
1178 /// Get two parameters as (x, y) pairs for plotting
1179 /// Optimized for scatter plot use case with zero allocations until the collect
1180 /// # Errors
1181 /// Will return `Err` if either parameter name is not found
1182 pub fn get_xy_pairs(&self, x_param: &str, y_param: &str) -> Result<Vec<(f32, f32)>> {
1183 let x_data = self.get_parameter_events_slice(x_param)?;
1184 let y_data = self.get_parameter_events_slice(y_param)?;
1185
1186 // Verify both parameters have the same length
1187 if x_data.len() != y_data.len() {
1188 return Err(anyhow!(
1189 "Parameter length mismatch: {} has {} events, {} has {} events",
1190 x_param,
1191 x_data.len(),
1192 y_param,
1193 y_data.len()
1194 ));
1195 }
1196
1197 // Zip is zero-cost abstraction - uses iterators efficiently
1198 Ok(x_data
1199 .iter()
1200 .zip(y_data.iter())
1201 .map(|(&x, &y)| (x, y))
1202 .collect())
1203 }
1204
1205 /// Get DataFrame height (number of events)
1206 #[must_use]
1207 pub fn get_event_count_from_dataframe(&self) -> usize {
1208 self.data_frame.height()
1209 }
1210
1211 /// Get DataFrame width (number of parameters)
1212 #[must_use]
1213 pub fn get_parameter_count_from_dataframe(&self) -> usize {
1214 self.data_frame.width()
1215 }
1216
1217 /// Get DataFrame column names (parameter names)
1218 pub fn get_parameter_names_from_dataframe(&self) -> Vec<String> {
1219 self.data_frame
1220 .get_column_names()
1221 .into_iter()
1222 .map(|s| s.to_string())
1223 .collect()
1224 }
1225
1226 /// Aggregate statistics for a parameter using Polars' streaming API for low memory usage and minimal, chunked passes.
1227 ///
1228 /// When streaming is enabled, Polars creates a *Pipeline*:
1229 ///
1230 /// **Source**: It pulls a chunk of data from the disk (e.g., 50,000 rows).
1231 ///
1232 /// **Operators**: It passes that chunk through your expressions (calculating the running sum, count, min, and max for that specific chunk).
1233 ///
1234 /// **Sink**: It aggregates the results from all chunks into a final result.
1235 ///
1236 /// Because the statistics we are calculating (min, max, mean) are *associative* and *commutative*, Polars can calculate them partially on each chunk and then combine them at the very end.
1237 ///
1238 /// Returns (min, max, mean, std_dev)
1239 /// # Errors
1240 /// Will return `Err` if the parameter is not found or stats calculation fails
1241 pub fn get_parameter_statistics(&self, channel_name: &str) -> Result<(f32, f32, f32, f32)> {
1242 let stats = (*self.data_frame)
1243 .clone()
1244 .lazy()
1245 .select([
1246 col(channel_name).min().alias("min"),
1247 col(channel_name).max().alias("max"),
1248 col(channel_name).mean().alias("mean"),
1249 col(channel_name).std(1).alias("std"),
1250 ])
1251 .collect_with_engine(Engine::Streaming)?;
1252 let min = stats
1253 .column("min")
1254 .map_err(|e| anyhow!("Column 'min' not found in statistics: {}", e))?
1255 .f32()
1256 .map_err(|e| anyhow!("Column 'min' is not f32 type: {}", e))?
1257 .get(0)
1258 .ok_or_else(|| anyhow!("No min value found"))?;
1259 let max = stats
1260 .column("max")
1261 .map_err(|e| anyhow!("Column 'max' not found in statistics: {}", e))?
1262 .f32()
1263 .map_err(|e| anyhow!("Column 'max' is not f32 type: {}", e))?
1264 .get(0)
1265 .ok_or_else(|| anyhow!("No max value found"))?;
1266 let mean = stats
1267 .column("mean")
1268 .map_err(|e| anyhow!("Column 'mean' not found in statistics: {}", e))?
1269 .f32()
1270 .map_err(|e| anyhow!("Column 'mean' is not f32 type: {}", e))?
1271 .get(0)
1272 .ok_or_else(|| anyhow!("No mean value found"))?;
1273 let std = stats
1274 .column("std")
1275 .map_err(|e| anyhow!("Column 'std' not found in statistics: {}", e))?
1276 .f32()
1277 .map_err(|e| anyhow!("Column 'std' is not f32 type: {}", e))?
1278 .get(0)
1279 .ok_or_else(|| anyhow!("No std deviation value found"))?;
1280
1281 Ok((min, max, mean, std))
1282 }
1283
1284 // ==================== TRANSFORMATION METHODS ====================
1285
1286 /// Apply arcsinh transformation to a parameter using Polars
1287 /// This is the most common transformation for flow cytometry data
1288 /// Formula: arcsinh(x / cofactor)
1289 ///
1290 /// # Arguments
1291 /// * `parameter_name` - Name of the parameter to transform
1292 /// * `cofactor` - Scaling factor (typical: 150-200 for modern instruments)
1293 ///
1294 /// # Returns
1295 /// New DataFrame with the transformed parameter
1296 pub fn apply_arcsinh_transform(
1297 &self,
1298 parameter_name: &str,
1299 cofactor: f32,
1300 ) -> Result<EventDataFrame> {
1301 let df = (*self.data_frame).clone();
1302
1303 // Get the column to transform
1304 let col = df
1305 .column(parameter_name)
1306 .map_err(|e| anyhow!("Parameter {} not found: {}", parameter_name, e))?;
1307
1308 let series = col.as_materialized_series();
1309 let ca = series
1310 .f32()
1311 .map_err(|e| anyhow!("Parameter {} is not f32: {}", parameter_name, e))?;
1312
1313 // Apply arcsinh transformation using TransformType implementation
1314 // The division by ln(10) was incorrectly converting to log10 scale,
1315 // which compressed the data ~2.3x and caused MAD to over-remove events
1316 use rayon::prelude::*;
1317 let transform = TransformType::Arcsinh { cofactor };
1318 let transformed: Vec<f32> = ca
1319 .cont_slice()
1320 .map_err(|e| anyhow!("Data not contiguous: {}", e))?
1321 .par_iter()
1322 .map(|&x| transform.transform(&x))
1323 .collect();
1324
1325 // Create new column with transformed data
1326 let new_series = Series::new(parameter_name.into(), transformed);
1327
1328 // Replace the column in DataFrame
1329 let mut new_df = df;
1330 new_df
1331 .replace(parameter_name, new_series.into())
1332 .map_err(|e| anyhow!("Failed to replace column: {}", e))?;
1333
1334 Ok(Arc::new(new_df))
1335 }
1336
1337 /// Apply arcsinh transformation to multiple parameters
1338 ///
1339 /// # Arguments
1340 /// * `parameters` - List of (parameter_name, cofactor) pairs
1341 ///
1342 /// # Returns
1343 /// New DataFrame with all specified parameters transformed
1344 pub fn apply_arcsinh_transforms(&self, parameters: &[(&str, f32)]) -> Result<EventDataFrame> {
1345 let mut df = (*self.data_frame).clone();
1346
1347 use rayon::prelude::*;
1348
1349 for &(param_name, cofactor) in parameters {
1350 let col = df
1351 .column(param_name)
1352 .map_err(|e| anyhow!("Parameter {} not found: {}", param_name, e))?;
1353
1354 let series = col.as_materialized_series();
1355 let ca = series
1356 .f32()
1357 .map_err(|e| anyhow!("Parameter {} is not f32: {}", param_name, e))?;
1358
1359 // Apply arcsinh transformation using TransformType implementation
1360 // Standard flow cytometry arcsinh - no division by ln(10)
1361 let transform = TransformType::Arcsinh { cofactor };
1362 let transformed: Vec<f32> = ca
1363 .cont_slice()
1364 .map_err(|e| anyhow!("Data not contiguous: {}", e))?
1365 .par_iter()
1366 .map(|&x| transform.transform(&x))
1367 .collect();
1368
1369 let new_series = Series::new(param_name.into(), transformed);
1370 df.replace(param_name, new_series.into())
1371 .map_err(|e| anyhow!("Failed to replace column {}: {}", param_name, e))?;
1372 }
1373
1374 Ok(Arc::new(df))
1375 }
1376
1377 /// Apply default arcsinh transformation to all fluorescence parameters
1378 /// Automatically detects fluorescence parameters (excludes FSC, SSC, Time)
1379 /// Uses cofactor = 200 (good default for modern instruments)
1380 pub fn apply_default_arcsinh_transform(&self) -> Result<EventDataFrame> {
1381 let param_names = self.get_parameter_names_from_dataframe();
1382
1383 // Filter to fluorescence parameters (exclude scatter and time)
1384 let fluor_params: Vec<(&str, f32)> = param_names
1385 .iter()
1386 .filter(|name| {
1387 let upper = name.to_uppercase();
1388 !upper.contains("FSC") && !upper.contains("SSC") && !upper.contains("TIME")
1389 })
1390 .map(|name| (name.as_str(), 2000.0)) // Default cofactor = 2000
1391 .collect();
1392
1393 self.apply_arcsinh_transforms(&fluor_params)
1394 }
1395
1396 /// Apply biexponential (logicle) transformation matching FlowJo defaults
1397 /// Automatically detects fluorescence parameters (excludes FSC, SSC, Time)
1398 /// Uses FlowJo default parameters: top_of_scale=262144 (18-bit), positive_decades=4.5, negative_decades=0, width=0.5
1399 pub fn apply_default_biexponential_transform(&self) -> Result<EventDataFrame> {
1400 let param_names = self.get_parameter_names_from_dataframe();
1401
1402 // Filter to fluorescence parameters (exclude scatter and time)
1403 let fluor_params: Vec<&str> = param_names
1404 .iter()
1405 .filter(|name| {
1406 let upper = name.to_uppercase();
1407 !upper.contains("FSC") && !upper.contains("SSC") && !upper.contains("TIME")
1408 })
1409 .map(|name| name.as_str())
1410 .collect();
1411
1412 let mut df = (*self.data_frame).clone();
1413
1414 use rayon::prelude::*;
1415
1416 // FlowJo default biexponential parameters
1417 let transform = TransformType::Biexponential {
1418 top_of_scale: 262144.0, // 18-bit data (2^18)
1419 positive_decades: 4.5,
1420 negative_decades: 0.0,
1421 width: 0.5,
1422 };
1423
1424 for param_name in fluor_params {
1425 let col = df
1426 .column(param_name)
1427 .map_err(|e| anyhow!("Parameter {} not found: {}", param_name, e))?;
1428
1429 let series = col.as_materialized_series();
1430 let ca = series
1431 .f32()
1432 .map_err(|e| anyhow!("Parameter {} is not f32: {}", param_name, e))?;
1433
1434 // Apply biexponential transformation using TransformType implementation
1435 let transformed: Vec<f32> = ca
1436 .cont_slice()
1437 .map_err(|e| anyhow!("Data not contiguous: {}", e))?
1438 .par_iter()
1439 .map(|&x| transform.transform(&x))
1440 .collect();
1441
1442 let new_series = Series::new(param_name.into(), transformed);
1443 df.replace(param_name, new_series.into())
1444 .map_err(|e| anyhow!("Failed to replace column {}: {}", param_name, e))?;
1445 }
1446
1447 Ok(Arc::new(df))
1448 }
1449
1450 // ==================== COMPENSATION METHODS ====================
1451
1452 /// Extract compensation matrix from $SPILLOVER keyword
1453 /// Returns (matrix, channel_names) if spillover keyword exists
1454 /// Returns None if no spillover keyword is present in the file
1455 ///
1456 /// # Returns
1457 /// Some((compensation_matrix, channel_names)) if spillover exists, None otherwise
1458 ///
1459 /// # Errors
1460 /// Will return `Err` if spillover keyword is malformed
1461 pub fn get_spillover_matrix(&self) -> Result<Option<(Mat<f32>, Vec<String>)>> {
1462 use crate::keyword::{Keyword, MixedKeyword};
1463
1464 // Try to get compensation matrix from $SPILLOVER (FCS 3.1+), $SPILL (unofficial/custom), or $COMP (FCS 3.0)
1465 // Check in order of preference: SPILLOVER (official) > SPILL (common) > COMP (legacy)
1466 let spillover_keyword = self
1467 .metadata
1468 .keywords
1469 .get("$SPILLOVER")
1470 .or_else(|| self.metadata.keywords.get("$SPILL"))
1471 .or_else(|| self.metadata.keywords.get("$COMP"));
1472
1473 let spillover_keyword = match spillover_keyword {
1474 Some(Keyword::Mixed(MixedKeyword::SPILLOVER {
1475 n_parameters,
1476 parameter_names,
1477 matrix_values,
1478 })) => (
1479 *n_parameters,
1480 parameter_names.clone(),
1481 matrix_values.clone(),
1482 ),
1483 Some(_) => {
1484 // Keyword exists but has wrong type - might be stored as String(Other) if parsing failed
1485 // Try to parse it manually
1486 let keyword_name = if self.metadata.keywords.contains_key("$SPILLOVER") {
1487 "$SPILLOVER"
1488 } else if self.metadata.keywords.contains_key("$SPILL") {
1489 "$SPILL"
1490 } else if self.metadata.keywords.contains_key("$COMP") {
1491 "$COMP"
1492 } else {
1493 return Ok(None);
1494 };
1495
1496 // Try to get the raw string value and parse it
1497 // This handles the case where $SPILL/$COMP was stored as String(Other) because
1498 // it wasn't recognized during initial parsing
1499 if let Some(Keyword::String(crate::keyword::StringKeyword::Other(value))) =
1500 self.metadata.keywords.get(keyword_name)
1501 {
1502 // Parse the string value as SPILLOVER using the same logic as parse_spillover
1503 let parts: Vec<&str> = value.trim().split(',').collect();
1504 if !parts.is_empty() {
1505 if let Ok(n_parameters) = parts[0].trim().parse::<usize>() {
1506 if parts.len() >= 1 + n_parameters {
1507 let parameter_names: Vec<String> = parts[1..=n_parameters]
1508 .iter()
1509 .map(|s| s.trim().to_string())
1510 .collect();
1511
1512 let expected_matrix_size = n_parameters * n_parameters;
1513 let matrix_start = 1 + n_parameters;
1514
1515 if parts.len() >= matrix_start + expected_matrix_size {
1516 // Parse matrix values (handle comma decimal separator)
1517 let mut matrix_values = Vec::new();
1518 for part in
1519 &parts[matrix_start..matrix_start + expected_matrix_size]
1520 {
1521 let cleaned = part.trim().replace(',', ".");
1522 if let Ok(val) = cleaned.parse::<f32>() {
1523 matrix_values.push(val);
1524 } else {
1525 break; // Failed to parse, give up
1526 }
1527 }
1528
1529 if matrix_values.len() == expected_matrix_size {
1530 let matrix = Mat::from_fn(
1531 n_parameters,
1532 n_parameters,
1533 |i, j| matrix_values[i * n_parameters + j],
1534 );
1535 return Ok(Some((matrix, parameter_names)));
1536 }
1537 }
1538 }
1539 }
1540 }
1541 }
1542
1543 return Err(anyhow!(
1544 "{} keyword exists but has wrong type or could not be parsed",
1545 keyword_name
1546 ));
1547 }
1548 None => {
1549 // No spillover keyword - this is fine, not all files have it
1550 return Ok(None);
1551 }
1552 };
1553
1554 let (n_params, param_names, matrix_values): (usize, Vec<String>, Vec<f32>) =
1555 spillover_keyword;
1556
1557 // Validate matrix dimensions
1558 let expected_matrix_size = n_params * n_params;
1559 if matrix_values.len() != expected_matrix_size {
1560 return Err(anyhow!(
1561 "SPILLOVER matrix size mismatch: expected {} values for {}x{} matrix, got {}",
1562 expected_matrix_size,
1563 n_params,
1564 n_params,
1565 matrix_values.len()
1566 ));
1567 }
1568
1569 // Create Mat from matrix values (FCS spillover is stored row-major)
1570 let matrix =
1571 Mat::from_fn(n_params, n_params, |i, j| matrix_values[i * n_params + j]);
1572
1573 Ok(Some((matrix, param_names)))
1574 }
1575
1576 /// Check if this file has compensation information
1577 #[must_use]
1578 pub fn has_compensation(&self) -> bool {
1579 self.get_spillover_matrix()
1580 .map(|opt| opt.is_some())
1581 .unwrap_or(false)
1582 }
1583
1584 /// Resolve spillover channel labels to actual $PnN channel names.
1585 /// Some instruments (e.g. IntelliCyt iQue3) use parameter numbers ("1","2","3") in
1586 /// $SPILLOVER instead of channel names; these are mapped via metadata.get_parameter_channel_name.
1587 fn resolve_spillover_channel_names(&self, channel_names: &[String]) -> Result<Vec<String>> {
1588 let mut resolved = Vec::with_capacity(channel_names.len());
1589 for name in channel_names {
1590 let resolved_name = if let Ok(param_idx) = name.trim().parse::<usize>() {
1591 if param_idx >= 1 {
1592 self.metadata
1593 .get_parameter_channel_name(param_idx)
1594 .map(|s| s.to_string())
1595 .map_err(|e| {
1596 anyhow!(
1597 "Parameter {} not found: spillover references \"{}\" but {}",
1598 param_idx,
1599 name,
1600 e
1601 )
1602 })?
1603 } else {
1604 name.clone()
1605 }
1606 } else {
1607 name.clone()
1608 };
1609 if !self.parameters.contains_key(resolved_name.as_str()) {
1610 return Err(anyhow!(
1611 "Spillover channel \"{}\" (resolved) not found in parameters",
1612 resolved_name
1613 ));
1614 }
1615 resolved.push(resolved_name);
1616 }
1617 Ok(resolved)
1618 }
1619
1620 /// Apply compensation from the file's $SPILLOVER keyword
1621 /// Convenience method that extracts spillover and applies it automatically
1622 ///
1623 /// Resolves spillover channel labels: some instruments (e.g. IntelliCyt iQue3) use
1624 /// parameter numbers ("1", "2", "3") in $SPILLOVER instead of $PnN names; these are
1625 /// mapped to actual channel names via metadata.
1626 ///
1627 /// # Returns
1628 /// New DataFrame with compensated data, or error if no spillover keyword exists
1629 pub fn apply_file_compensation(&self) -> Result<EventDataFrame> {
1630 let (comp_matrix, channel_names) = self
1631 .get_spillover_matrix()?
1632 .ok_or_else(|| anyhow!("No $SPILLOVER keyword found in FCS file"))?;
1633
1634 let channel_refs: Vec<String> = self.resolve_spillover_channel_names(&channel_names)?;
1635 let channel_refs: Vec<&str> = channel_refs.iter().map(|s| s.as_str()).collect();
1636
1637 self.apply_compensation(comp_matrix.as_ref(), &channel_refs)
1638 }
1639
1640 /// OPTIMIZED: Get compensated data for specific parameters only (lazy/partial compensation)
1641 ///
1642 /// This is 15-30x faster than apply_file_compensation when you only need a few parameters
1643 /// because it:
1644 /// - Only compensates the requested channels (e.g., 2 vs 30)
1645 /// - Uses sparse matrix optimization for matrices with >80% zeros
1646 /// - Bypasses compensation entirely for identity matrices
1647 ///
1648 /// # Arguments
1649 /// * `channels_needed` - Only the channel names you need compensated (typically 2 for a plot)
1650 ///
1651 /// # Returns
1652 /// HashMap of channel_name -> compensated data (as Vec<f32>)
1653 ///
1654 /// # Performance
1655 /// - Dense matrix (2/30 channels): **15x faster** (150ms → 10ms)
1656 /// - Sparse matrix (90% sparse): **50x faster** (150ms → 3ms)
1657 /// - Identity matrix: **300x faster** (150ms → 0.5ms)
1658 pub fn get_compensated_parameters(
1659 &self,
1660 channels_needed: &[&str],
1661 ) -> Result<std::collections::HashMap<String, Vec<f32>>> {
1662 use std::collections::HashMap;
1663
1664 // Get spillover matrix
1665 let (comp_matrix, matrix_channel_names) = self
1666 .get_spillover_matrix()?
1667 .ok_or_else(|| anyhow!("No $SPILLOVER keyword found in FCS file"))?;
1668
1669 let n_events = self.get_event_count_from_dataframe();
1670
1671 // OPTIMIZATION 1: Check if matrix is identity (no compensation needed)
1672 let is_identity = {
1673 let mut is_id = true;
1674 for i in 0..comp_matrix.nrows() {
1675 for j in 0..comp_matrix.ncols() {
1676 let expected = if i == j { 1.0 } else { 0.0 };
1677 if (comp_matrix[(i, j)] - expected).abs() > 1e-6 {
1678 is_id = false;
1679 break;
1680 }
1681 }
1682 if !is_id {
1683 break;
1684 }
1685 }
1686 is_id
1687 };
1688
1689 if is_identity {
1690 eprintln!("🚀 Identity matrix detected - bypassing compensation");
1691 // Just return original data
1692 let mut result = HashMap::new();
1693 for &channel in channels_needed {
1694 let data = self.get_parameter_events_slice(channel)?;
1695 result.insert(channel.to_string(), data.to_vec());
1696 }
1697 return Ok(result);
1698 }
1699
1700 // OPTIMIZATION 2: Analyze sparsity
1701 let total_elements = comp_matrix.nrows() * comp_matrix.ncols();
1702 let mut non_zero_count = 0;
1703 for i in 0..comp_matrix.nrows() {
1704 for j in 0..comp_matrix.ncols() {
1705 if comp_matrix[(i, j)].abs() > 1e-6 {
1706 non_zero_count += 1;
1707 }
1708 }
1709 }
1710 let sparsity = 1.0 - (non_zero_count as f64 / total_elements as f64);
1711 let is_sparse = sparsity > 0.8;
1712
1713 eprintln!(
1714 "📊 Compensation matrix: {:.1}% sparse, {} non-zero coefficients",
1715 sparsity * 100.0,
1716 non_zero_count
1717 );
1718
1719 // Find indices of channels we need
1720 let channel_indices: HashMap<&str, usize> = matrix_channel_names
1721 .iter()
1722 .enumerate()
1723 .map(|(i, name)| (name.as_str(), i))
1724 .collect();
1725
1726 let needed_indices: Vec<(String, usize)> = channels_needed
1727 .iter()
1728 .filter_map(|&ch| channel_indices.get(ch).map(|&idx| (ch.to_string(), idx)))
1729 .collect();
1730
1731 if needed_indices.is_empty() {
1732 return Err(anyhow!(
1733 "None of the requested channels found in compensation matrix"
1734 ));
1735 }
1736
1737 // Extract ONLY the channels involved in compensating our needed channels
1738 // For each needed channel, we need all channels that have non-zero spillover
1739 let mut involved_indices = std::collections::HashSet::new();
1740 for &(_, row_idx) in &needed_indices {
1741 // Add the channel itself
1742 involved_indices.insert(row_idx);
1743
1744 // Add channels with non-zero spillover
1745 if is_sparse {
1746 for col_idx in 0..comp_matrix.ncols() {
1747 if comp_matrix[(row_idx, col_idx)].abs() > 1e-6 {
1748 involved_indices.insert(col_idx);
1749 }
1750 }
1751 } else {
1752 // For dense matrix, we need all channels
1753 for i in 0..comp_matrix.ncols() {
1754 involved_indices.insert(i);
1755 }
1756 }
1757 }
1758
1759 let mut involved_vec: Vec<usize> = involved_indices.into_iter().collect();
1760 involved_vec.sort_unstable();
1761
1762 eprintln!(
1763 "🎯 Lazy compensation: loading {} channels (vs {} total)",
1764 involved_vec.len(),
1765 matrix_channel_names.len()
1766 );
1767
1768 // Extract data for involved channels only
1769 let mut channel_data: Vec<Vec<f32>> = Vec::with_capacity(involved_vec.len());
1770 for &idx in &involved_vec {
1771 let channel_name = &matrix_channel_names[idx];
1772 let data = self.get_parameter_events_slice(channel_name)?;
1773 channel_data.push(data.to_vec());
1774 }
1775
1776 // Extract sub-matrix for involved channels
1777 let sub_matrix = Mat::from_fn(involved_vec.len(), involved_vec.len(), |i, j| {
1778 comp_matrix[(involved_vec[i], involved_vec[j])]
1779 });
1780
1781 // Use CPU compensation (benchmarked: GPU was slower due to transfer overhead)
1782 // Invert sub-matrix using faer (pure Rust, no system BLAS)
1783 let comp_inv = crate::matrix::MatrixOps::invert_matrix(sub_matrix.as_ref())
1784 .map_err(|e| anyhow!("Failed to invert compensation matrix: {:?}", e))?;
1785
1786 // Compensate ONLY the involved channels
1787 use rayon::prelude::*;
1788 let n_involved = involved_vec.len();
1789 let compensated_data: Vec<Vec<f32>> = (0..n_involved)
1790 .into_par_iter()
1791 .map(|i| {
1792 let mut result = vec![0.0; n_events];
1793 for event_idx in 0..n_events {
1794 let mut sum = 0.0;
1795 for j in 0..n_involved {
1796 sum += comp_inv[(i, j)] * channel_data[j][event_idx];
1797 }
1798 result[event_idx] = sum;
1799 }
1800 result
1801 })
1802 .collect();
1803
1804 // Build result HashMap for only the channels we need
1805 let mut result = HashMap::new();
1806 for (channel_name, orig_idx) in needed_indices {
1807 if let Some(local_idx) = involved_vec.iter().position(|&x| x == orig_idx) {
1808 result.insert(channel_name, compensated_data[local_idx].clone());
1809 }
1810 }
1811
1812 eprintln!("🚀 Lazy compensation completed");
1813 Ok(result)
1814 }
1815
1816 /// Apply compensation matrix to the data using Polars
1817 /// Compensation corrects for spectral overlap between fluorescence channels
1818 ///
1819 /// # Arguments
1820 /// * `compensation_matrix` - 2D matrix where element [i,j] represents spillover from channel j into channel i
1821 /// * `channel_names` - Names of channels in the order they appear in the matrix
1822 ///
1823 /// # Returns
1824 /// New DataFrame with compensated fluorescence values
1825 ///
1826 /// # Example
1827 /// ```ignore
1828 /// // Create a 3x3 compensation matrix
1829 /// use faer::mat;
1830 /// let comp_matrix = mat![
1831 /// [1.0, 0.1, 0.05], // FL1-A compensation
1832 /// [0.2, 1.0, 0.1], // FL2-A compensation
1833 /// [0.1, 0.15, 1.0], // FL3-A compensation
1834 /// ];
1835 /// let channels = vec!["FL1-A", "FL2-A", "FL3-A"];
1836 /// let compensated = fcs.apply_compensation(comp_matrix.as_ref(), &channels)?;
1837 /// ```
1838 pub fn apply_compensation(
1839 &self,
1840 compensation_matrix: faer::MatRef<'_, f32>,
1841 channel_names: &[&str],
1842 ) -> Result<EventDataFrame> {
1843 let comp = compensation_matrix;
1844 // Verify matrix dimensions match channel names
1845 let n_channels = channel_names.len();
1846 if comp.nrows() != n_channels || comp.ncols() != n_channels {
1847 return Err(anyhow!(
1848 "Compensation matrix dimensions ({}, {}) don't match number of channels ({})",
1849 comp.nrows(),
1850 comp.ncols(),
1851 n_channels
1852 ));
1853 }
1854
1855 // Extract data for channels to compensate
1856 let mut channel_data: Vec<Vec<f32>> = Vec::with_capacity(n_channels);
1857 let n_events = self.get_event_count_from_dataframe();
1858
1859 for &channel_name in channel_names {
1860 let data = self.get_parameter_events_slice(channel_name)?;
1861 channel_data.push(data.to_vec());
1862 }
1863
1864 // Use CPU compensation (benchmarked: GPU was slower due to transfer overhead)
1865 // Apply compensation: compensated = original * inverse(compensation_matrix)
1866 // For efficiency, we pre-compute the inverse using faer (pure Rust, no system BLAS)
1867 let comp_inv =
1868 crate::matrix::MatrixOps::invert_matrix(comp).map_err(|e| {
1869 anyhow!("Failed to invert compensation matrix: {:?}", e)
1870 })?;
1871
1872 // Perform matrix multiplication for each event
1873 use rayon::prelude::*;
1874 let compensated_data: Vec<Vec<f32>> = (0..n_channels)
1875 .into_par_iter()
1876 .map(|i| {
1877 let mut result = vec![0.0; n_events];
1878 for event_idx in 0..n_events {
1879 let mut sum = 0.0;
1880 for j in 0..n_channels {
1881 sum += comp_inv[(i, j)] * channel_data[j][event_idx];
1882 }
1883 result[event_idx] = sum;
1884 }
1885 result
1886 })
1887 .collect();
1888
1889 // Create new DataFrame with compensated values
1890 let mut df = (*self.data_frame).clone();
1891
1892 for (i, &channel_name) in channel_names.iter().enumerate() {
1893 let new_series = Series::new(channel_name.into(), compensated_data[i].clone());
1894 df.replace(channel_name, new_series.into())
1895 .map_err(|e| anyhow!("Failed to replace column {}: {}", channel_name, e))?;
1896 }
1897
1898 Ok(Arc::new(df))
1899 }
1900
1901 /// Apply spectral unmixing (matrix solve only, no compensation or transformation)
1902 ///
1903 /// This method performs unmixing by solving: observation = mixing_matrix × abundances
1904 /// Does NOT apply compensation or transformations - these should be done separately.
1905 ///
1906 /// For overdetermined systems (more detectors than endmembers), uses least squares.
1907 ///
1908 /// # Arguments
1909 /// * `unmixing_matrix` - Matrix describing spectral signatures of fluorophores (detectors × endmembers)
1910 /// * `detector_names` - Names of detector channels (must match matrix rows)
1911 /// * `endmember_names` - Optional names for endmembers (columns). If None, uses "Endmember1", "Endmember2", etc.
1912 ///
1913 /// # Returns
1914 /// New DataFrame with unmixed endmember abundances (columns named by endmember names or indices)
1915 ///
1916 /// # Errors
1917 /// Returns error if detector names don't match matrix dimensions or data cannot be extracted
1918 pub fn apply_spectral_unmixing(
1919 &self,
1920 unmixing_matrix: faer::MatRef<'_, f32>,
1921 detector_names: &[&str],
1922 endmember_names: Option<&[&str]>,
1923 ) -> Result<EventDataFrame> {
1924 use faer::linalg::solvers::{Qr, SolveLstsq};
1925
1926 // Verify matrix dimensions match detector names
1927 let n_detectors = detector_names.len();
1928 if unmixing_matrix.nrows() != n_detectors {
1929 return Err(anyhow!(
1930 "Unmixing matrix rows ({}) don't match number of detectors ({})",
1931 unmixing_matrix.nrows(),
1932 n_detectors
1933 ));
1934 }
1935
1936 let n_endmembers = unmixing_matrix.ncols();
1937 let n_events = self.get_event_count_from_dataframe();
1938
1939 // Extract data for detectors
1940 let mut detector_data: Vec<Vec<f32>> = Vec::with_capacity(n_detectors);
1941 for &detector_name in detector_names {
1942 let data = self.get_parameter_events_slice(detector_name)?;
1943 detector_data.push(data.to_vec());
1944 }
1945
1946 // Observations matrix: events × detectors
1947 let observations =
1948 Mat::from_fn(n_events, n_detectors, |event_idx, detector_idx| {
1949 detector_data[detector_idx][event_idx]
1950 });
1951
1952 // Perform unmixing: for each event, solve: observation = unmixing_matrix × abundances
1953 // For overdetermined systems (n_detectors > n_endmembers), use QR least squares (faer)
1954 let qr = Qr::new(unmixing_matrix);
1955
1956 let mut unmixed_data: Vec<Vec<f32>> = Vec::with_capacity(n_events);
1957 for event_idx in 0..n_events {
1958 let b_col =
1959 Mat::from_fn(n_detectors, 1, |i, _| observations[(event_idx, i)]);
1960 let x_faer = qr.solve_lstsq(b_col.as_ref());
1961 let abundances: Vec<f32> =
1962 (0..n_endmembers).map(|i| x_faer[(i, 0)]).collect();
1963 unmixed_data.push(abundances);
1964 }
1965
1966 // Create DataFrame with endmember abundances
1967 let mut columns: Vec<Column> = Vec::with_capacity(n_endmembers);
1968 for endmember_idx in 0..n_endmembers {
1969 let values: Vec<f32> = unmixed_data
1970 .iter()
1971 .map(|abundances| abundances[endmember_idx])
1972 .collect();
1973
1974 // Use provided endmember name if available, otherwise use generic "Endmember{i}" format
1975 let column_name = if let Some(names) = endmember_names {
1976 names[endmember_idx].to_string()
1977 } else {
1978 format!("Endmember{}", endmember_idx + 1)
1979 };
1980
1981 let series = Column::new(column_name.into(), values);
1982 columns.push(series);
1983 }
1984
1985 let df = DataFrame::new(n_events, columns)
1986 .map_err(|e| anyhow!("Failed to create DataFrame: {}", e))?;
1987
1988 Ok(Arc::new(df))
1989 }
1990}