flow_fcs/file.rs
1// Internal crate imports
2use crate::{
3 TransformType,
4 byteorder::ByteOrder,
5 header::Header,
6 keyword::{IntegerableKeyword, StringableKeyword},
7 metadata::Metadata,
8 parameter::{EventDataFrame, EventDatum, Parameter, ParameterBuilder, ParameterMap},
9};
10use rayon::iter::IntoParallelRefIterator;
11// Standard library imports
12use ndarray_linalg::Inverse;
13use std::borrow::Cow;
14use std::fs::File;
15use std::ops::Deref;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18
19// External crate imports
20use anyhow::{Result, anyhow};
21use itertools::{Itertools, MinMaxResult};
22use memmap3::{Mmap, MmapOptions};
23use ndarray::Array2;
24use polars::prelude::*;
25use rayon::iter::ParallelIterator;
26use rayon::slice::ParallelSlice;
27
28/// A shareable wrapper around the file path and memory-map
29///
30/// Uses Arc<Mmap> to share the memory mapping across clones without creating
31/// new file descriptors or memory mappings. This is more efficient than cloning
32/// the underlying file descriptor and re-mapping.
33#[derive(Debug, Clone)]
34pub struct AccessWrapper {
35 /// An owned, mutable path to the file on disk
36 pub path: PathBuf,
37 /// The memory-mapped file, shared via Arc for efficient cloning
38 ///
39 /// # Safety
40 /// The Mmap is created from a File handle and remains valid as long as:
41 /// 1. The file is not truncated while mapped
42 /// 2. The file contents are not modified while mapped (we only read)
43 /// 3. The Mmap is not accessed after the file is deleted
44 ///
45 /// Our usage satisfies these invariants because:
46 /// - FCS files are read-only once opened (we never write back to them)
47 /// - The file remains open (via File handle) for the lifetime of the Mmap
48 /// - We only drop the Mmap when the FCS file is no longer needed
49 pub mmap: Arc<Mmap>,
50}
51
52impl AccessWrapper {
53 /// Creates a new `AccessWrapper` from a file path
54 /// # Errors
55 /// Will return `Err` if:
56 /// - the file cannot be opened
57 /// - the file cannot be memory-mapped
58 pub fn new(path: &str) -> Result<Self> {
59 let file = File::open(path)?;
60 let path = PathBuf::from(path);
61
62 // memmap3 provides better safety guarantees than memmap2, though OS-level
63 // memory mapping still requires unsafe at creation time. The resulting Mmap
64 // is safe to use and provides better guarantees than memmap2.
65 let mmap = unsafe { MmapOptions::new().map(&file)? };
66
67 Ok(Self {
68 path,
69 mmap: Arc::new(mmap),
70 })
71 }
72}
73
74impl Deref for AccessWrapper {
75 type Target = Mmap;
76
77 fn deref(&self) -> &Self::Target {
78 &self.mmap
79 }
80}
81
82/// A struct representing an FCS file
83#[derive(Debug, Clone)]
84pub struct Fcs {
85 /// The header segment of the fcs file, including the version, and byte offsets to the text, data, and analysis segments
86 pub header: Header,
87 /// The metadata segment of the fcs file, including the delimiter, and a hashmap of keyword/value pairs
88 pub metadata: Metadata,
89 /// A hashmap of the parameter names and their associated metadata
90 pub parameters: ParameterMap,
91
92 /// Event data stored in columnar format via Polars DataFrame (NEW)
93 /// Each column represents one parameter (e.g., FSC-A, SSC-A, FL1-A)
94 /// Polars provides:
95 /// - Zero-copy column access
96 /// - Built-in SIMD operations
97 /// - Lazy evaluation for complex queries
98 /// - Apache Arrow interop
99 /// This is the primary data format going forward
100 pub data_frame: EventDataFrame,
101
102 /// A wrapper around the file, path, and memory-map
103 pub file_access: AccessWrapper,
104}
105
106impl Fcs {
107 /// Creates a new Fcs file struct
108 /// # Errors
109 /// Will return `Err` if:
110 /// - the file cannot be opened,
111 /// - the file extension is not `fcs`,
112 /// - the TEXT segment cannot be validated,
113 /// - the raw data cannot be read,
114 /// - the parameter names and labels cannot be generated
115 pub fn new() -> Result<Self> {
116 Ok(Self {
117 header: Header::new(),
118 metadata: Metadata::new(),
119 parameters: ParameterMap::default(),
120 data_frame: Arc::new(DataFrame::empty()),
121 file_access: AccessWrapper::new("")?,
122 })
123 }
124
125 /// Opens and parses an FCS file from the given path
126 ///
127 /// This is the primary entry point for reading FCS files. It:
128 /// - Validates the file extension (must be `.fcs`)
129 /// - Memory-maps the file for efficient access
130 /// - Parses the header segment to determine FCS version and segment offsets
131 /// - Parses the text segment to extract metadata and keywords
132 /// - Validates required keywords for the FCS version
133 /// - Generates a GUID if one is not present
134 /// - Loads event data into a Polars DataFrame for efficient columnar access
135 ///
136 /// # Arguments
137 /// * `path` - Path to the FCS file (must have `.fcs` extension)
138 ///
139 /// # Errors
140 /// Will return `Err` if:
141 /// - the file cannot be opened or memory-mapped
142 /// - the file extension is not `.fcs`
143 /// - the FCS version is invalid or unsupported
144 /// - required keywords are missing for the FCS version
145 /// - the data segment cannot be read or parsed
146 /// - parameter metadata cannot be generated
147 ///
148 /// # Example
149 /// ```no_run
150 /// use flow_fcs::Fcs;
151 ///
152 /// let fcs = Fcs::open("data/sample.fcs")?;
153 /// println!("File has {} events", fcs.get_number_of_events()?);
154 /// # Ok::<(), anyhow::Error>(())
155 /// ```
156 pub fn open(path: &str) -> Result<Self> {
157 // Attempt to open the file path
158 let file_access = AccessWrapper::new(path).expect("Should be able make new access wrapper");
159
160 // Validate the file extension
161 Self::validate_fcs_extension(&file_access.path)
162 .expect("Should have a valid file extension");
163
164 // Create header and metadata structs from a memory map of the file
165 let header = Header::from_mmap(&file_access.mmap)
166 .expect("Should be able to create header from mmap");
167 let mut metadata = Metadata::from_mmap(&file_access.mmap, &header);
168
169 metadata
170 .validate_text_segment_keywords(&header)
171 .expect("Should have valid text segment keywords");
172 // metadata.validate_number_of_parameters()?;
173 metadata.validate_guid();
174
175 Ok(Self {
176 parameters: Self::generate_parameter_map(&metadata)
177 .expect("Should be able to generate parameter map"),
178 data_frame: Self::store_raw_data_as_dataframe(&header, &file_access.mmap, &metadata)
179 .expect("Should be able to store raw data as DataFrame"),
180 file_access,
181 header,
182 metadata,
183 })
184 }
185
186 /// Validates that the file extension is `.fcs`
187 /// # Errors
188 /// Will return `Err` if the file extension is not `.fcs`
189 fn validate_fcs_extension(path: &Path) -> Result<()> {
190 let extension = path
191 .extension()
192 .ok_or_else(|| anyhow!("File has no extension"))?
193 .to_str()
194 .ok_or_else(|| anyhow!("File extension is not valid UTF-8"))?;
195
196 if extension.to_lowercase() != "fcs" {
197 return Err(anyhow!("Invalid file extension: {}", extension));
198 }
199
200 Ok(())
201 }
202
203 /// Reads raw data from FCS file and stores it as a Polars DataFrame
204 /// Returns columnar data optimized for parameter-wise access patterns
205 ///
206 /// This function provides significant performance benefits over ndarray:
207 /// - 2-5x faster data filtering and transformations
208 /// - Native columnar storage (optimal for FCS parameter access patterns)
209 /// - Zero-copy operations via Apache Arrow
210 /// - Built-in SIMD acceleration
211 ///
212 /// # Errors
213 /// Will return `Err` if:
214 /// - The data cannot be read
215 /// - The data cannot be converted to f32 values
216 /// - The DataFrame cannot be constructed
217 fn store_raw_data_as_dataframe(
218 header: &Header,
219 mmap: &Mmap,
220 metadata: &Metadata,
221 ) -> Result<EventDataFrame> {
222 // Validate data offset bounds before accessing mmap
223 let mut data_start = *header.data_offset.start();
224 let mut data_end = *header.data_offset.end();
225 let mmap_len = mmap.len();
226
227 // Handle zero offsets by checking keywords
228 if data_start == 0 {
229 if let Ok(begin_data) = metadata.get_integer_keyword("$BEGINDATA") {
230 data_start = begin_data.get_usize().clone();
231 } else {
232 return Err(anyhow!(
233 "$BEGINDATA keyword not found. Unable to determine data start."
234 ));
235 }
236 }
237
238 if data_end == 0 {
239 if let Ok(end_data) = metadata.get_integer_keyword("$ENDDATA") {
240 data_end = end_data.get_usize().clone();
241 } else {
242 return Err(anyhow!(
243 "$ENDDATA keyword not found. Unable to determine data end."
244 ));
245 }
246 }
247
248 // Validate offsets
249 if data_start >= mmap_len {
250 return Err(anyhow!(
251 "Data start offset {} is beyond mmap length {}",
252 data_start,
253 mmap_len
254 ));
255 }
256
257 if data_end >= mmap_len {
258 return Err(anyhow!(
259 "Data end offset {} is beyond mmap length {}",
260 data_end,
261 mmap_len
262 ));
263 }
264
265 if data_start > data_end {
266 return Err(anyhow!(
267 "Data start offset {} is greater than end offset {}",
268 data_start,
269 data_end
270 ));
271 }
272
273 // Extract data bytes
274 let data_bytes = &mmap[data_start..=data_end];
275
276 let number_of_parameters = metadata
277 .get_number_of_parameters()
278 .expect("Should be able to retrieve the number of parameters");
279 let number_of_events = metadata
280 .get_number_of_events()
281 .expect("Should be able to retrieve the number of events");
282 let bytes_per_event = metadata
283 .get_data_type()
284 .expect("Should be able to get the data type")
285 .get_bytes_per_event();
286 let byte_order = metadata
287 .get_byte_order()
288 .expect("Should be able to get the byte order");
289
290 // Validate data size
291 let expected_total_bytes = number_of_events * number_of_parameters * bytes_per_event;
292 if data_bytes.len() < expected_total_bytes {
293 return Err(anyhow!(
294 "Insufficient data: expected {} bytes, but only have {} bytes",
295 expected_total_bytes,
296 data_bytes.len()
297 ));
298 }
299
300 // Confirm that data_bytes is a multiple of 4 bytes
301 if data_bytes.len() % 4 != 0 {
302 return Err(anyhow!(
303 "Data bytes length {} is not a multiple of 4",
304 data_bytes.len()
305 ));
306 }
307
308 // Parse f32 values using the same optimized approach as store_raw_data
309 let f32_values: Vec<f32> = {
310 let needs_swap = match (byte_order, cfg!(target_endian = "little")) {
311 (ByteOrder::LittleEndian, true) | (ByteOrder::BigEndian, false) => false,
312 _ => true,
313 };
314
315 match bytemuck::try_cast_slice::<u8, f32>(data_bytes) {
316 Ok(f32_slice) => {
317 #[cfg(debug_assertions)]
318 eprintln!(
319 "✓ Zero-copy path (DataFrame): aligned data ({} bytes, {} f32s)",
320 data_bytes.len(),
321 f32_slice.len()
322 );
323
324 if needs_swap {
325 f32_slice
326 .par_iter()
327 .map(|&f| f32::from_bits(f.to_bits().swap_bytes()))
328 .collect()
329 } else {
330 f32_slice.to_vec()
331 }
332 }
333 Err(_) => {
334 #[cfg(debug_assertions)]
335 eprintln!(
336 "⚠ Fallback path (DataFrame): unaligned data ({} bytes)",
337 data_bytes.len()
338 );
339
340 data_bytes
341 .par_chunks_exact(4)
342 .map(|chunk| {
343 let mut bytes = [0u8; 4];
344 bytes.copy_from_slice(chunk);
345 let bits = u32::from_ne_bytes(bytes);
346 let bits = if needs_swap { bits.swap_bytes() } else { bits };
347 f32::from_bits(bits)
348 })
349 .collect()
350 }
351 }
352 };
353
354 // Validate f32 count
355 let expected_f32_values = number_of_events * number_of_parameters;
356 if f32_values.len() != expected_f32_values {
357 return Err(anyhow!(
358 "F32 values count mismatch: expected {} but got {}",
359 expected_f32_values,
360 f32_values.len()
361 ));
362 }
363
364 // Create Polars Series for each parameter (column)
365 // FCS data is stored row-wise (event1_param1, event1_param2, ..., event2_param1, ...)
366 // We need to extract columns using stride access
367 let mut columns: Vec<Column> = Vec::with_capacity(*number_of_parameters);
368
369 for param_idx in 0..*number_of_parameters {
370 // Extract this parameter's values across all events
371 // Use iterator with step_by for efficient stride access
372 let param_values: Vec<f32> = f32_values
373 .iter()
374 .skip(param_idx)
375 .step_by(*number_of_parameters)
376 .copied()
377 .collect();
378
379 // Verify we got the right number of events
380 assert_eq!(
381 param_values.len(),
382 *number_of_events,
383 "Parameter {} should have {} events, got {}",
384 param_idx + 1,
385 number_of_events,
386 param_values.len()
387 );
388
389 // Get parameter name from metadata for column name
390 let param_name = metadata
391 .get_parameter_channel_name(param_idx + 1)
392 .map(|s| s.to_string())
393 .unwrap_or_else(|_| format!("P{}", param_idx + 1));
394
395 // Create Series (Polars column) with name
396 let series = Column::new(param_name.as_str().into(), param_values);
397 columns.push(series);
398 }
399
400 // Create DataFrame from columns
401 let df = DataFrame::new(columns).map_err(|e| {
402 anyhow!(
403 "Failed to create DataFrame from {} columns: {}",
404 number_of_parameters,
405 e
406 )
407 })?;
408
409 // Verify DataFrame shape
410 assert_eq!(
411 df.height(),
412 *number_of_events,
413 "DataFrame height {} doesn't match expected events {}",
414 df.height(),
415 number_of_events
416 );
417 assert_eq!(
418 df.width(),
419 *number_of_parameters,
420 "DataFrame width {} doesn't match expected parameters {}",
421 df.width(),
422 number_of_parameters
423 );
424
425 #[cfg(debug_assertions)]
426 eprintln!(
427 "✓ Created DataFrame: {} events × {} parameters",
428 df.height(),
429 df.width()
430 );
431
432 Ok(Arc::new(df))
433 }
434
435 /// Looks for the parameter name as a key in the `parameters` hashmap and returns a reference to it
436 /// Performs case-insensitive lookup for parameter names
437 /// # Errors
438 /// Will return `Err` if the parameter name is not found in the `parameters` hashmap
439 pub fn find_parameter(&self, parameter_name: &str) -> Result<&Parameter> {
440 // Try exact match first (fast path)
441 if let Some(param) = self.parameters.get(parameter_name) {
442 return Ok(param);
443 }
444
445 // Case-insensitive fallback: search through parameter map
446 for (key, param) in self.parameters.iter() {
447 if key.eq_ignore_ascii_case(parameter_name) {
448 return Ok(param);
449 }
450 }
451
452 Err(anyhow!("Parameter not found: {parameter_name}"))
453 }
454
455 /// Looks for the parameter name as a key in the `parameters` hashmap and returns a mutable reference to it
456 /// Performs case-insensitive lookup for parameter names
457 /// # Errors
458 /// Will return `Err` if the parameter name is not found in the `parameters` hashmap
459 pub fn find_mutable_parameter(&mut self, parameter_name: &str) -> Result<&mut Parameter> {
460 // Try exact match first (fast path)
461 // Note: We need to check if the key exists as Arc<str>, so we iterate to find exact match
462 let exact_key = self
463 .parameters
464 .keys()
465 .find(|k| k.as_ref() == parameter_name)
466 .map(|k| k.clone());
467
468 if let Some(key) = exact_key {
469 return self
470 .parameters
471 .get_mut(&key)
472 .ok_or_else(|| anyhow!("Parameter not found: {parameter_name}"));
473 }
474
475 // Case-insensitive fallback: find the key first (clone Arc to avoid borrow issues)
476 let matching_key = self
477 .parameters
478 .keys()
479 .find(|key| key.eq_ignore_ascii_case(parameter_name))
480 .map(|k| k.clone());
481
482 if let Some(key) = matching_key {
483 return self
484 .parameters
485 .get_mut(&key)
486 .ok_or_else(|| anyhow!("Parameter not found: {parameter_name}"));
487 }
488
489 Err(anyhow!("Parameter not found: {parameter_name}"))
490 }
491
492 /// Returns a zero-copy reference to a Polars Float32Chunked view of a column for the parameter
493 ///
494 /// This provides access to the underlying Polars chunked array, which is useful
495 /// for operations that work directly with Polars types. For most use cases,
496 /// `get_parameter_events_slice()` is preferred as it provides a simple `&[f32]` slice.
497 ///
498 /// # Arguments
499 /// * `channel_name` - The channel name (e.g., "FSC-A", "FL1-A")
500 ///
501 /// # Errors
502 /// Will return `Err` if:
503 /// - the parameter name is not found in the parameters map
504 /// - the column data type is not Float32
505 pub fn get_parameter_events(&'_ self, channel_name: &str) -> Result<&Float32Chunked> {
506 Ok(self
507 .get_parameter_column(channel_name)?
508 .f32()
509 .map_err(|e| anyhow!("Parameter {} is not f32 type: {}", channel_name, e))?)
510 }
511 /// Get a reference to the Polars Column for a parameter by channel name
512 ///
513 /// This provides direct access to the underlying Polars column, which can be useful
514 /// for advanced operations that require the full Polars API.
515 ///
516 /// # Arguments
517 /// * `channel_name` - The channel name (e.g., "FSC-A", "FL1-A")
518 ///
519 /// # Errors
520 /// Will return `Err` if the parameter name is not found in the DataFrame
521 pub fn get_parameter_column(&'_ self, channel_name: &str) -> Result<&Column> {
522 self.data_frame
523 .column(channel_name)
524 .map_err(|e| anyhow!("Parameter {} not found: {}", channel_name, e))
525 }
526
527 /// Looks for the parameter name as a key in the 'parameters' hashmap and returns a new Vec<f32> of the raw event data
528 /// NOTE: This allocates a full copy of the events - prefer `get_parameter_events_slice` when possible
529 /// # Errors
530 /// Will return 'Err' if the parameter name is not found in the 'parameters hashmap or if the events are not found
531 pub fn get_parameter_events_as_owned_vec(&self, channel_name: &str) -> Result<Vec<EventDatum>> {
532 Ok(self.get_parameter_events_slice(channel_name)?.to_vec())
533 }
534
535 /// Returns the minimum and maximum values of the parameter
536 /// # Errors
537 /// Will return `Err` if the parameter name is not found in the 'parameters' hashmap or if the events are not found
538 pub fn get_minmax_of_parameter(&self, channel_name: &str) -> Result<(EventDatum, EventDatum)> {
539 let parameter = self.find_parameter(channel_name)?;
540 let events = self.get_parameter_events(¶meter.channel_name)?;
541
542 match events.iter().minmax() {
543 MinMaxResult::NoElements => Err(anyhow!("No elements found")),
544 MinMaxResult::OneElement(e) => Err(anyhow!("Only one element found: {:?}", e)),
545 MinMaxResult::MinMax(min, max) => Ok((min.unwrap(), max.unwrap())),
546 }
547 }
548
549 /// Creates a new `HashMap` of `Parameter`s
550 /// using the `Fcs` file's metadata to find the channel and label names from the `PnN` and `PnS` keywords.
551 /// Does NOT store events on the parameter.
552 /// # Errors
553 /// Will return `Err` if:
554 /// - the number of parameters cannot be found in the metadata,
555 /// - the parameter name cannot be found in the metadata,
556 /// - the parameter cannot be built (using the Builder pattern)
557 pub fn generate_parameter_map(metadata: &Metadata) -> Result<ParameterMap> {
558 let mut map = ParameterMap::default();
559 let number_of_parameters = metadata.get_number_of_parameters()?;
560 for parameter_number in 1..=*number_of_parameters {
561 let channel_name = metadata.get_parameter_channel_name(parameter_number)?;
562
563 // Use label name or fallback to the parameter name
564 let label_name = match metadata.get_parameter_label(parameter_number) {
565 Ok(label) => label,
566 Err(_) => channel_name,
567 };
568
569 let transform = if channel_name.contains("FSC")
570 || channel_name.contains("SSC")
571 || channel_name.contains("Time")
572 {
573 TransformType::Linear
574 } else {
575 TransformType::default()
576 };
577
578 // Get excitation wavelength from metadata if available
579 let excitation_wavelength = metadata
580 .get_parameter_excitation_wavelength(parameter_number)
581 .ok()
582 .flatten();
583
584 let parameter = ParameterBuilder::default()
585 // For the ParameterBuilder, ensure we're using the proper methods
586 // that may be defined by the Builder derive macro
587 .parameter_number(parameter_number)
588 .channel_name(channel_name)
589 .label_name(label_name)
590 .transform(transform)
591 .excitation_wavelength(excitation_wavelength)
592 .build()?;
593
594 // Add the parameter events to the hashmap keyed by the parameter name
595 map.insert(channel_name.to_string().into(), parameter);
596 }
597
598 Ok(map)
599 }
600
601 /// Looks for a keyword among the metadata and returns its value as a `&str`
602 /// # Errors
603 /// Will return `Err` if the `Keyword` is not found in the `metadata` or if the `Keyword` cannot be converted to a `&str`
604 pub fn get_keyword_string_value(&self, keyword: &str) -> Result<Cow<'_, str>> {
605 // TODO: This should be a match statement
606 if let Ok(keyword) = self.metadata.get_string_keyword(keyword) {
607 Ok(keyword.get_str())
608 } else if let Ok(keyword) = self.metadata.get_integer_keyword(keyword) {
609 Ok(keyword.get_str())
610 } else if let Ok(keyword) = self.metadata.get_float_keyword(keyword) {
611 Ok(keyword.get_str())
612 } else if let Ok(keyword) = self.metadata.get_byte_keyword(keyword) {
613 Ok(keyword.get_str())
614 } else if let Ok(keyword) = self.metadata.get_mixed_keyword(keyword) {
615 Ok(keyword.get_str())
616 } else {
617 Err(anyhow!("Keyword not found: {}", keyword))
618 }
619 }
620 /// A convenience function to return the `GUID` keyword from the `metadata` as a `&str`
621 /// # Errors
622 /// Will return `Err` if the `GUID` keyword is not found in the `metadata` or if the `GUID` keyword cannot be converted to a `&str`
623 pub fn get_guid(&self) -> Result<Cow<'_, str>> {
624 Ok(self.metadata.get_string_keyword("GUID")?.get_str())
625 }
626
627 /// Set or update the GUID keyword in the file's metadata
628 pub fn set_guid(&mut self, guid: String) {
629 self.metadata
630 .insert_string_keyword("GUID".to_string(), guid);
631 }
632
633 /// A convenience function to return the `$FIL` keyword from the `metadata` as a `&str`
634 /// # Errors
635 /// Will return `Err` if the `$FIL` keyword is not found in the `metadata` or if the `$FIL` keyword cannot be converted to a `&str`
636 pub fn get_fil_keyword(&self) -> Result<Cow<'_, str>> {
637 Ok(self.metadata.get_string_keyword("$FIL")?.get_str())
638 }
639
640 /// A convenience function to return the `$TOT` keyword from the `metadata` as a `usize`
641 /// # Errors
642 /// Will return `Err` if the `$TOT` keyword is not found in the `metadata` or if the `$TOT` keyword cannot be converted to a `usize`
643 pub fn get_number_of_events(&self) -> Result<&usize> {
644 self.metadata.get_number_of_events()
645 }
646
647 /// A convenience function to return the `$PAR` keyword from the `metadata` as a `usize`
648 /// # Errors
649 /// Will return `Err` if the `$PAR` keyword is not found in the `metadata` or if the `$PAR` keyword cannot be converted to a `usize`
650 pub fn get_number_of_parameters(&self) -> Result<&usize> {
651 self.metadata.get_number_of_parameters()
652 }
653
654 // ==================== NEW POLARS-BASED ACCESSOR METHODS ====================
655
656 /// Get events for a parameter as a slice of f32 values
657 /// Polars gives us direct access to the underlying buffer (zero-copy)
658 /// # Errors
659 /// Will return `Err` if:
660 /// - the parameter name is not found
661 /// - the Series data type is not Float32
662 /// - the data is chunked (rare for FCS files)
663 pub fn get_parameter_events_slice(&self, channel_name: &str) -> Result<&[f32]> {
664 self.get_parameter_events(channel_name)?
665 .cont_slice()
666 .map_err(|e| anyhow!("Parameter {} data is not contiguous: {}", channel_name, e))
667 }
668
669 /// Get two parameters as (x, y) pairs for plotting
670 /// Optimized for scatter plot use case with zero allocations until the collect
671 /// # Errors
672 /// Will return `Err` if either parameter name is not found
673 pub fn get_xy_pairs(&self, x_param: &str, y_param: &str) -> Result<Vec<(f32, f32)>> {
674 let x_data = self.get_parameter_events_slice(x_param)?;
675 let y_data = self.get_parameter_events_slice(y_param)?;
676
677 // Verify both parameters have the same length
678 if x_data.len() != y_data.len() {
679 return Err(anyhow!(
680 "Parameter length mismatch: {} has {} events, {} has {} events",
681 x_param,
682 x_data.len(),
683 y_param,
684 y_data.len()
685 ));
686 }
687
688 // Zip is zero-cost abstraction - uses iterators efficiently
689 Ok(x_data
690 .iter()
691 .zip(y_data.iter())
692 .map(|(&x, &y)| (x, y))
693 .collect())
694 }
695
696 /// Get DataFrame height (number of events)
697 #[must_use]
698 pub fn get_event_count_from_dataframe(&self) -> usize {
699 self.data_frame.height()
700 }
701
702 /// Get DataFrame width (number of parameters)
703 #[must_use]
704 pub fn get_parameter_count_from_dataframe(&self) -> usize {
705 self.data_frame.width()
706 }
707
708 /// Get DataFrame column names (parameter names)
709 pub fn get_parameter_names_from_dataframe(&self) -> Vec<String> {
710 self.data_frame
711 .get_column_names()
712 .into_iter()
713 .map(|s| s.to_string())
714 .collect()
715 }
716
717 /// Aggregate statistics for a parameter using Polars' streaming API for low memory usage and minimal, chunked passes.
718 ///
719 /// When streaming is enabled, Polars creates a *Pipeline*:
720 ///
721 /// **Source**: It pulls a chunk of data from the disk (e.g., 50,000 rows).
722 ///
723 /// **Operators**: It passes that chunk through your expressions (calculating the running sum, count, min, and max for that specific chunk).
724 ///
725 /// **Sink**: It aggregates the results from all chunks into a final result.
726 ///
727 /// Because the statistics we are calculating (min, max, mean) are *associative* and *commutative*, Polars can calculate them partially on each chunk and then combine them at the very end.
728 ///
729 /// Returns (min, max, mean, std_dev)
730 /// # Errors
731 /// Will return `Err` if the parameter is not found or stats calculation fails
732 pub fn get_parameter_statistics(&self, channel_name: &str) -> Result<(f32, f32, f32, f32)> {
733 let stats = (*self.data_frame)
734 .clone()
735 .lazy()
736 .select([
737 col(channel_name).min().alias("min"),
738 col(channel_name).max().alias("max"),
739 col(channel_name).mean().alias("mean"),
740 col(channel_name).std(1).alias("std"),
741 ])
742 .collect_with_engine(Engine::Streaming)?;
743 let min = stats
744 .column("min")
745 .unwrap()
746 .f32()?
747 .get(0)
748 .ok_or(anyhow!("No min found"))?;
749 let max = stats
750 .column("max")
751 .unwrap()
752 .f32()?
753 .get(0)
754 .ok_or(anyhow!("No max found"))?;
755 let mean = stats
756 .column("mean")
757 .unwrap()
758 .f32()?
759 .get(0)
760 .ok_or(anyhow!("No mean found"))?;
761 let std = stats
762 .column("std")
763 .unwrap()
764 .f32()?
765 .get(0)
766 .ok_or(anyhow!("No std deviation found"))?;
767
768 Ok((min, max, mean, std))
769 }
770
771 // ==================== TRANSFORMATION METHODS ====================
772
773 /// Apply arcsinh transformation to a parameter using Polars
774 /// This is the most common transformation for flow cytometry data
775 /// Formula: arcsinh(x / cofactor) / ln(10)
776 ///
777 /// # Arguments
778 /// * `parameter_name` - Name of the parameter to transform
779 /// * `cofactor` - Scaling factor (typical: 150-200 for modern instruments)
780 ///
781 /// # Returns
782 /// New DataFrame with the transformed parameter
783 pub fn apply_arcsinh_transform(
784 &self,
785 parameter_name: &str,
786 cofactor: f32,
787 ) -> Result<EventDataFrame> {
788 let df = (*self.data_frame).clone();
789
790 // Get the column to transform
791 let col = df
792 .column(parameter_name)
793 .map_err(|e| anyhow!("Parameter {} not found: {}", parameter_name, e))?;
794
795 let series = col.as_materialized_series();
796 let ca = series
797 .f32()
798 .map_err(|e| anyhow!("Parameter {} is not f32: {}", parameter_name, e))?;
799
800 // Apply arcsinh transformation: arcsinh(x / cofactor) / ln(10)
801 use rayon::prelude::*;
802 let ln_10 = 10_f32.ln();
803 let transformed: Vec<f32> = ca
804 .cont_slice()
805 .map_err(|e| anyhow!("Data not contiguous: {}", e))?
806 .par_iter()
807 .map(|&x| ((x / cofactor).asinh()) / ln_10)
808 .collect();
809
810 // Create new column with transformed data
811 let new_series = Series::new(parameter_name.into(), transformed);
812
813 // Replace the column in DataFrame
814 let mut new_df = df;
815 new_df
816 .replace(parameter_name, new_series)
817 .map_err(|e| anyhow!("Failed to replace column: {}", e))?;
818
819 Ok(Arc::new(new_df))
820 }
821
822 /// Apply arcsinh transformation to multiple parameters
823 ///
824 /// # Arguments
825 /// * `parameters` - List of (parameter_name, cofactor) pairs
826 ///
827 /// # Returns
828 /// New DataFrame with all specified parameters transformed
829 pub fn apply_arcsinh_transforms(&self, parameters: &[(&str, f32)]) -> Result<EventDataFrame> {
830 let mut df = (*self.data_frame).clone();
831
832 let ln_10 = 10_f32.ln();
833 use rayon::prelude::*;
834
835 for &(param_name, cofactor) in parameters {
836 let col = df
837 .column(param_name)
838 .map_err(|e| anyhow!("Parameter {} not found: {}", param_name, e))?;
839
840 let series = col.as_materialized_series();
841 let ca = series
842 .f32()
843 .map_err(|e| anyhow!("Parameter {} is not f32: {}", param_name, e))?;
844
845 let transformed: Vec<f32> = ca
846 .cont_slice()
847 .map_err(|e| anyhow!("Data not contiguous: {}", e))?
848 .par_iter()
849 .map(|&x| ((x / cofactor).asinh()) / ln_10)
850 .collect();
851
852 let new_series = Series::new(param_name.into(), transformed);
853 df.replace(param_name, new_series)
854 .map_err(|e| anyhow!("Failed to replace column {}: {}", param_name, e))?;
855 }
856
857 Ok(Arc::new(df))
858 }
859
860 /// Apply default arcsinh transformation to all fluorescence parameters
861 /// Automatically detects fluorescence parameters (excludes FSC, SSC, Time)
862 /// Uses cofactor = 200 (good default for modern instruments)
863 pub fn apply_default_arcsinh_transform(&self) -> Result<EventDataFrame> {
864 let param_names = self.get_parameter_names_from_dataframe();
865
866 // Filter to fluorescence parameters (exclude scatter and time)
867 let fluor_params: Vec<(&str, f32)> = param_names
868 .iter()
869 .filter(|name| {
870 let upper = name.to_uppercase();
871 !upper.contains("FSC") && !upper.contains("SSC") && !upper.contains("TIME")
872 })
873 .map(|name| (name.as_str(), 200.0)) // Default cofactor = 200
874 .collect();
875
876 self.apply_arcsinh_transforms(&fluor_params)
877 }
878
879 // ==================== COMPENSATION METHODS ====================
880
881 /// Extract compensation matrix from $SPILLOVER keyword
882 /// Returns (matrix, channel_names) if spillover keyword exists
883 /// Returns None if no spillover keyword is present in the file
884 ///
885 /// # Returns
886 /// Some((compensation_matrix, channel_names)) if spillover exists, None otherwise
887 ///
888 /// # Errors
889 /// Will return `Err` if spillover keyword is malformed
890 pub fn get_spillover_matrix(&self) -> Result<Option<(Array2<f32>, Vec<String>)>> {
891 use crate::keyword::{Keyword, MixedKeyword};
892
893 // Try to get the $SPILLOVER keyword
894 let spillover_keyword = match self.metadata.keywords.get("$SPILLOVER") {
895 Some(Keyword::Mixed(MixedKeyword::SPILLOVER {
896 n_parameters,
897 parameter_names,
898 matrix_values,
899 })) => (
900 *n_parameters,
901 parameter_names.clone(),
902 matrix_values.clone(),
903 ),
904 Some(_) => {
905 return Err(anyhow!("$SPILLOVER keyword exists but has wrong type"));
906 }
907 None => {
908 // No spillover keyword - this is fine, not all files have it
909 return Ok(None);
910 }
911 };
912
913 let (n_params, param_names, matrix_values): (usize, Vec<String>, Vec<f32>) =
914 spillover_keyword;
915
916 // Validate matrix dimensions
917 let expected_matrix_size = n_params * n_params;
918 if matrix_values.len() != expected_matrix_size {
919 return Err(anyhow!(
920 "SPILLOVER matrix size mismatch: expected {} values for {}x{} matrix, got {}",
921 expected_matrix_size,
922 n_params,
923 n_params,
924 matrix_values.len()
925 ));
926 }
927
928 // Create Array2 from matrix values
929 // FCS spillover is stored row-major order
930 let matrix = Array2::from_shape_vec((n_params, n_params), matrix_values)
931 .map_err(|e| anyhow!("Failed to create compensation matrix from SPILLOVER: {}", e))?;
932
933 Ok(Some((matrix, param_names)))
934 }
935
936 /// Check if this file has compensation information
937 #[must_use]
938 pub fn has_compensation(&self) -> bool {
939 self.get_spillover_matrix()
940 .map(|opt| opt.is_some())
941 .unwrap_or(false)
942 }
943
944 /// Apply compensation from the file's $SPILLOVER keyword
945 /// Convenience method that extracts spillover and applies it automatically
946 ///
947 /// # Returns
948 /// New DataFrame with compensated data, or error if no spillover keyword exists
949 pub fn apply_file_compensation(&self) -> Result<EventDataFrame> {
950 let (comp_matrix, channel_names) = self
951 .get_spillover_matrix()?
952 .ok_or_else(|| anyhow!("No $SPILLOVER keyword found in FCS file"))?;
953
954 let channel_refs: Vec<&str> = channel_names.iter().map(|s| s.as_str()).collect();
955
956 self.apply_compensation(&comp_matrix, &channel_refs)
957 }
958
959 /// OPTIMIZED: Get compensated data for specific parameters only (lazy/partial compensation)
960 ///
961 /// This is 15-30x faster than apply_file_compensation when you only need a few parameters
962 /// because it:
963 /// - Only compensates the requested channels (e.g., 2 vs 30)
964 /// - Uses sparse matrix optimization for matrices with >80% zeros
965 /// - Bypasses compensation entirely for identity matrices
966 ///
967 /// # Arguments
968 /// * `channels_needed` - Only the channel names you need compensated (typically 2 for a plot)
969 ///
970 /// # Returns
971 /// HashMap of channel_name -> compensated data (as Vec<f32>)
972 ///
973 /// # Performance
974 /// - Dense matrix (2/30 channels): **15x faster** (150ms → 10ms)
975 /// - Sparse matrix (90% sparse): **50x faster** (150ms → 3ms)
976 /// - Identity matrix: **300x faster** (150ms → 0.5ms)
977 pub fn get_compensated_parameters(
978 &self,
979 channels_needed: &[&str],
980 ) -> Result<std::collections::HashMap<String, Vec<f32>>> {
981 use std::collections::HashMap;
982
983 // Get spillover matrix
984 let (comp_matrix, matrix_channel_names) = self
985 .get_spillover_matrix()?
986 .ok_or_else(|| anyhow!("No $SPILLOVER keyword found in FCS file"))?;
987
988 let n_events = self.get_event_count_from_dataframe();
989
990 // OPTIMIZATION 1: Check if matrix is identity (no compensation needed)
991 let is_identity = {
992 let mut is_id = true;
993 for i in 0..comp_matrix.nrows() {
994 for j in 0..comp_matrix.ncols() {
995 let expected = if i == j { 1.0 } else { 0.0 };
996 if (comp_matrix[[i, j]] - expected).abs() > 1e-6 {
997 is_id = false;
998 break;
999 }
1000 }
1001 if !is_id {
1002 break;
1003 }
1004 }
1005 is_id
1006 };
1007
1008 if is_identity {
1009 eprintln!("🚀 Identity matrix detected - bypassing compensation");
1010 // Just return original data
1011 let mut result = HashMap::new();
1012 for &channel in channels_needed {
1013 let data = self.get_parameter_events_slice(channel)?;
1014 result.insert(channel.to_string(), data.to_vec());
1015 }
1016 return Ok(result);
1017 }
1018
1019 // OPTIMIZATION 2: Analyze sparsity
1020 let total_elements = comp_matrix.len();
1021 let non_zero_count = comp_matrix.iter().filter(|&&x| x.abs() > 1e-6).count();
1022 let sparsity = 1.0 - (non_zero_count as f64 / total_elements as f64);
1023 let is_sparse = sparsity > 0.8;
1024
1025 eprintln!(
1026 "📊 Compensation matrix: {:.1}% sparse, {} non-zero coefficients",
1027 sparsity * 100.0,
1028 non_zero_count
1029 );
1030
1031 // Find indices of channels we need
1032 let channel_indices: HashMap<&str, usize> = matrix_channel_names
1033 .iter()
1034 .enumerate()
1035 .map(|(i, name)| (name.as_str(), i))
1036 .collect();
1037
1038 let needed_indices: Vec<(String, usize)> = channels_needed
1039 .iter()
1040 .filter_map(|&ch| channel_indices.get(ch).map(|&idx| (ch.to_string(), idx)))
1041 .collect();
1042
1043 if needed_indices.is_empty() {
1044 return Err(anyhow!(
1045 "None of the requested channels found in compensation matrix"
1046 ));
1047 }
1048
1049 // Extract ONLY the channels involved in compensating our needed channels
1050 // For each needed channel, we need all channels that have non-zero spillover
1051 let mut involved_indices = std::collections::HashSet::new();
1052 for &(_, row_idx) in &needed_indices {
1053 // Add the channel itself
1054 involved_indices.insert(row_idx);
1055
1056 // Add channels with non-zero spillover
1057 if is_sparse {
1058 for col_idx in 0..comp_matrix.ncols() {
1059 if comp_matrix[[row_idx, col_idx]].abs() > 1e-6 {
1060 involved_indices.insert(col_idx);
1061 }
1062 }
1063 } else {
1064 // For dense matrix, we need all channels
1065 for i in 0..comp_matrix.ncols() {
1066 involved_indices.insert(i);
1067 }
1068 }
1069 }
1070
1071 let mut involved_vec: Vec<usize> = involved_indices.into_iter().collect();
1072 involved_vec.sort_unstable();
1073
1074 eprintln!(
1075 "🎯 Lazy compensation: loading {} channels (vs {} total)",
1076 involved_vec.len(),
1077 matrix_channel_names.len()
1078 );
1079
1080 // Extract data for involved channels only
1081 let mut channel_data: Vec<Vec<f32>> = Vec::with_capacity(involved_vec.len());
1082 for &idx in &involved_vec {
1083 let channel_name = &matrix_channel_names[idx];
1084 let data = self.get_parameter_events_slice(channel_name)?;
1085 channel_data.push(data.to_vec());
1086 }
1087
1088 // Extract sub-matrix for involved channels
1089 let sub_matrix = {
1090 let mut sub = Array2::<f32>::zeros((involved_vec.len(), involved_vec.len()));
1091 for (i, &orig_i) in involved_vec.iter().enumerate() {
1092 for (j, &orig_j) in involved_vec.iter().enumerate() {
1093 sub[[i, j]] = comp_matrix[[orig_i, orig_j]];
1094 }
1095 }
1096 sub
1097 };
1098
1099 // Invert sub-matrix
1100 use ndarray_linalg::Inverse;
1101 let comp_inv = sub_matrix
1102 .inv()
1103 .map_err(|e| anyhow!("Failed to invert compensation matrix: {:?}", e))?;
1104
1105 // Compensate ONLY the involved channels
1106 use rayon::prelude::*;
1107 let compensated_data: Vec<Vec<f32>> = (0..involved_vec.len())
1108 .into_par_iter()
1109 .map(|i| {
1110 let row = comp_inv.row(i);
1111 let mut result = vec![0.0; n_events];
1112
1113 for event_idx in 0..n_events {
1114 let mut sum = 0.0;
1115 for (j, &coeff) in row.iter().enumerate() {
1116 sum += coeff * channel_data[j][event_idx];
1117 }
1118 result[event_idx] = sum;
1119 }
1120
1121 result
1122 })
1123 .collect();
1124
1125 // Build result HashMap for only the channels we need
1126 let mut result = HashMap::new();
1127 for (channel_name, orig_idx) in needed_indices {
1128 if let Some(local_idx) = involved_vec.iter().position(|&x| x == orig_idx) {
1129 result.insert(channel_name, compensated_data[local_idx].clone());
1130 }
1131 }
1132
1133 eprintln!("🚀 Lazy compensation completed");
1134 Ok(result)
1135 }
1136
1137 /// Apply compensation matrix to the data using Polars
1138 /// Compensation corrects for spectral overlap between fluorescence channels
1139 ///
1140 /// # Arguments
1141 /// * `compensation_matrix` - 2D matrix where element [i,j] represents spillover from channel j into channel i
1142 /// * `channel_names` - Names of channels in the order they appear in the matrix
1143 ///
1144 /// # Returns
1145 /// New DataFrame with compensated fluorescence values
1146 ///
1147 /// # Example
1148 /// ```ignore
1149 /// // Create a 3x3 compensation matrix
1150 /// let comp_matrix = Array2::from_shape_vec((3, 3), vec![
1151 /// 1.0, 0.1, 0.05, // FL1-A compensation
1152 /// 0.2, 1.0, 0.1, // FL2-A compensation
1153 /// 0.1, 0.15, 1.0, // FL3-A compensation
1154 /// ]).unwrap();
1155 /// let channels = vec!["FL1-A", "FL2-A", "FL3-A"];
1156 /// let compensated = fcs.apply_compensation(&comp_matrix, &channels)?;
1157 /// ```
1158 pub fn apply_compensation(
1159 &self,
1160 compensation_matrix: &Array2<f32>,
1161 channel_names: &[&str],
1162 ) -> Result<EventDataFrame> {
1163 // Verify matrix dimensions match channel names
1164 let n_channels = channel_names.len();
1165 if compensation_matrix.nrows() != n_channels || compensation_matrix.ncols() != n_channels {
1166 return Err(anyhow!(
1167 "Compensation matrix dimensions ({}, {}) don't match number of channels ({})",
1168 compensation_matrix.nrows(),
1169 compensation_matrix.ncols(),
1170 n_channels
1171 ));
1172 }
1173
1174 // Extract data for channels to compensate
1175 let mut channel_data: Vec<Vec<f32>> = Vec::with_capacity(n_channels);
1176 let n_events = self.get_event_count_from_dataframe();
1177
1178 for &channel_name in channel_names {
1179 let data = self.get_parameter_events_slice(channel_name)?;
1180 channel_data.push(data.to_vec());
1181 }
1182
1183 // Apply compensation: compensated = original * inverse(compensation_matrix)
1184 // For efficiency, we pre-compute the inverse
1185 let comp_inv = compensation_matrix
1186 .inv()
1187 .map_err(|e| anyhow!("Failed to invert compensation matrix: {:?}", e))?;
1188
1189 // Perform matrix multiplication for each event
1190 use rayon::prelude::*;
1191 let compensated_data: Vec<Vec<f32>> = (0..n_channels)
1192 .into_par_iter()
1193 .map(|i| {
1194 let row = comp_inv.row(i);
1195 let mut result = vec![0.0; n_events];
1196
1197 for event_idx in 0..n_events {
1198 let mut sum = 0.0;
1199 for (j, &coeff) in row.iter().enumerate() {
1200 sum += coeff * channel_data[j][event_idx];
1201 }
1202 result[event_idx] = sum;
1203 }
1204
1205 result
1206 })
1207 .collect();
1208
1209 // Create new DataFrame with compensated values
1210 let mut df = (*self.data_frame).clone();
1211
1212 for (i, &channel_name) in channel_names.iter().enumerate() {
1213 let new_series = Series::new(channel_name.into(), compensated_data[i].clone());
1214 df.replace(channel_name, new_series)
1215 .map_err(|e| anyhow!("Failed to replace column {}: {}", channel_name, e))?;
1216 }
1217
1218 Ok(Arc::new(df))
1219 }
1220
1221 /// Apply spectral unmixing (similar to compensation but for spectral flow cytometry)
1222 /// Uses a good default cofactor of 200 for transformation before/after unmixing
1223 ///
1224 /// # Arguments
1225 /// * `unmixing_matrix` - Matrix describing spectral signatures of fluorophores
1226 /// * `channel_names` - Names of spectral channels
1227 /// * `cofactor` - Cofactor for arcsinh transformation (default: 200)
1228 ///
1229 /// # Returns
1230 /// New DataFrame with unmixed and transformed fluorescence values
1231 pub fn apply_spectral_unmixing(
1232 &self,
1233 unmixing_matrix: &Array2<f32>,
1234 channel_names: &[&str],
1235 cofactor: Option<f32>,
1236 ) -> Result<EventDataFrame> {
1237 let cofactor = cofactor.unwrap_or(200.0);
1238
1239 // First, inverse-transform the data (go back to linear scale)
1240 let ln_10 = 10_f32.ln();
1241 let mut df = (*self.data_frame).clone();
1242
1243 use rayon::prelude::*;
1244 for &channel_name in channel_names {
1245 let col = df
1246 .column(channel_name)
1247 .map_err(|e| anyhow!("Parameter {} not found: {}", channel_name, e))?;
1248
1249 let series = col.as_materialized_series();
1250 let ca = series
1251 .f32()
1252 .map_err(|e| anyhow!("Parameter {} is not f32: {}", channel_name, e))?;
1253
1254 // Inverse arcsinh: x = cofactor * sinh(y * ln(10))
1255 let linear: Vec<f32> = ca
1256 .cont_slice()
1257 .map_err(|e| anyhow!("Data not contiguous: {}", e))?
1258 .par_iter()
1259 .map(|&y| cofactor * (y * ln_10).sinh())
1260 .collect();
1261
1262 let new_series = Series::new(channel_name.into(), linear);
1263 df.replace(channel_name, new_series)
1264 .map_err(|e| anyhow!("Failed to replace column: {}", e))?;
1265 }
1266
1267 // Apply unmixing matrix (same as compensation)
1268 let df_with_linear = Arc::new(df);
1269 let fcs_temp = Fcs {
1270 data_frame: df_with_linear,
1271 ..self.clone()
1272 };
1273 let unmixed = fcs_temp.apply_compensation(unmixing_matrix, channel_names)?;
1274
1275 // Re-apply arcsinh transformation
1276 let fcs_unmixed = Fcs {
1277 data_frame: unmixed,
1278 ..self.clone()
1279 };
1280
1281 let params_with_cofactor: Vec<(&str, f32)> =
1282 channel_names.iter().map(|&name| (name, cofactor)).collect();
1283
1284 fcs_unmixed.apply_arcsinh_transforms(¶ms_with_cofactor)
1285 }
1286}