use std::{
cmp::Ordering, collections::HashMap, io, sync::mpsc::{Receiver, Sender, SyncSender, TryRecvError}, time::Duration
};
use mzpeaks::{CentroidLike, DeconvolutedCentroidLike};
use crate::prelude::*;
use super::MultiLayerSpectrum;
#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
pub enum HasIonMobility {
#[default]
None = 0,
Point = 1,
Dimension = 2
}
impl PartialOrd for HasIonMobility {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HasIonMobility {
fn cmp(&self, other: &Self) -> Ordering {
match self {
HasIonMobility::None => {
match other {
HasIonMobility::None => Ordering::Equal,
_ => Ordering::Less
}
},
HasIonMobility::Point => {
match other {
HasIonMobility::None => Ordering::Greater,
HasIonMobility::Point => Ordering::Equal,
HasIonMobility::Dimension => Ordering::Less,
}
},
HasIonMobility::Dimension => {
match other {
HasIonMobility::Dimension => Ordering::Equal,
_ => Ordering::Greater
}
},
}
}
}
#[derive(Debug)]
pub struct Collator<T: Send> {
pub waiting: HashMap<usize, T, identity_hash::BuildIdentityHasher<usize>>,
pub next_key: usize,
pub ticks: usize,
pub done: bool,
}
impl<T: Send> Default for Collator<T> {
fn default() -> Self {
Self {
waiting: Default::default(),
next_key: Default::default(),
ticks: Default::default(),
done: Default::default(),
}
}
}
impl<T: Send> Collator<T> {
pub fn receive(&mut self, group_idx: usize, group: T) {
self.waiting.insert(group_idx, group);
}
pub fn receive_from(&mut self, receiver: &Receiver<(usize, T)>, batch_size: usize) -> bool {
self.receive_from_timeout(receiver, batch_size, Duration::from_micros(1))
}
pub fn receive_from_timeout(
&mut self,
receiver: &Receiver<(usize, T)>,
batch_size: usize,
timeout: Duration,
) -> bool {
let mut counter = 0usize;
while let Ok((group_idx, group)) = receiver.recv_timeout(timeout) {
self.receive(group_idx, group);
counter += 1;
if counter > batch_size {
return true;
}
}
false
}
pub fn receive_from_map_timeout<U, F: Fn(usize, U) -> (usize, T)>(
&mut self,
receiver: &Receiver<(usize, U)>,
batch_size: usize,
timeout: Duration,
cb: F,
) {
let mut counter = 0usize;
while let Ok((group_idx, group)) = receiver.recv_timeout(timeout) {
let (group_idx, group) = cb(group_idx, group);
self.receive(group_idx, group);
counter += 1;
if counter > batch_size {
break;
}
}
}
pub fn receive_from_map_iter_timeout<
U,
I: Iterator<Item = (usize, T)>,
F: Fn(usize, U) -> I,
>(
&mut self,
receiver: &Receiver<(usize, U)>,
batch_size: usize,
timeout: Duration,
cb: F,
) {
let mut counter = 0usize;
while let Ok((group_idx, group)) = receiver.recv_timeout(timeout) {
self.receive_map_iter(group_idx, group, &cb);
counter += 1;
if counter > batch_size {
break;
}
}
}
pub fn receive_map<U, F: Fn(usize, U) -> (usize, T)>(
&mut self,
group_idx: usize,
group: U,
cb: F,
) {
let (group_idx, group) = cb(group_idx, group);
self.receive(group_idx, group);
}
pub fn receive_map_iter<U, I: Iterator<Item = (usize, T)>, F: Fn(usize, U) -> I>(
&mut self,
group_idx: usize,
group: U,
cb: F,
) {
cb(group_idx, group).for_each(|(i, x)| {
self.receive(i, x);
})
}
pub fn has_next(&self) -> bool {
self.waiting.contains_key(&self.next_key)
}
pub fn try_next(&mut self) -> Option<(usize, T)> {
self.waiting.remove_entry(&self.next_key).inspect(|_| {
self.next_key += 1;
})
}
pub fn set_next_key(&mut self, key: usize) {
self.next_key = key
}
pub fn collate_sync(receiver: Receiver<(usize, T)>, sender: SyncSender<(usize, T)>) {
let mut collator = Self::default();
loop {
match receiver.try_recv() {
Ok((group_idx, group)) => {
collator.receive(group_idx, group);
collator.receive_from(&receiver, 100);
}
Err(e) => match e {
TryRecvError::Empty => {}
TryRecvError::Disconnected => {
collator.done = true;
break;
}
},
}
while let Some((group_idx, group)) = collator.try_next() {
match sender.send((group_idx, group)) {
Ok(()) => {}
Err(e) => {
log::error!("Failed to send {group_idx} for writing: {e}")
}
}
}
}
}
pub fn collate(receiver: Receiver<(usize, T)>, sender: Sender<(usize, T)>) {
let mut collator = Self::default();
loop {
match receiver.try_recv() {
Ok((group_idx, group)) => {
collator.receive(group_idx, group);
collator.receive_from(&receiver, 100);
}
Err(e) => match e {
TryRecvError::Empty => {}
TryRecvError::Disconnected => {
collator.done = true;
break;
}
},
}
while let Some((group_idx, group)) = collator.try_next() {
match sender.send((group_idx, group)) {
Ok(()) => {}
Err(e) => {
log::error!("Failed to send {group_idx} for writing: {e}")
}
}
}
}
}
}
impl<
C: CentroidLike + Send + BuildArrayMapFrom + BuildFromArrayMap + Clone,
D: DeconvolutedCentroidLike + Send + BuildArrayMapFrom + BuildFromArrayMap + Clone,
> SpectrumWriter<C, D> for Collator<MultiLayerSpectrum<C, D>>
{
fn write<S: SpectrumLike<C, D> + 'static>(&mut self, spectrum: &S) -> std::io::Result<usize> {
let k = spectrum.index();
let peaks = spectrum.peaks().cloned();
let descr = spectrum.description().clone();
let t = MultiLayerSpectrum::from_peaks_data_levels_and_description(peaks, descr);
self.receive(k, t);
Ok(k)
}
fn write_owned<S: SpectrumLike<C, D> + 'static>(
&mut self,
spectrum: S,
) -> std::io::Result<usize> {
let k = spectrum.index();
let (peaks, description) = spectrum.into_peaks_and_description();
let t = MultiLayerSpectrum::from_peaks_data_levels_and_description(peaks, description);
self.receive(k, t);
Ok(k)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
fn close(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl<
C: CentroidLike + Send + BuildArrayMapFrom + BuildFromArrayMap + Clone,
D: DeconvolutedCentroidLike + Send + BuildArrayMapFrom + BuildFromArrayMap + Clone,
> SpectrumWriter<C, D> for Sender<MultiLayerSpectrum<C, D>> {
fn write<S: SpectrumLike<C, D> + 'static>(&mut self, spectrum: &S) -> std::io::Result<usize> {
let k = spectrum.index();
let peaks = spectrum.peaks().cloned();
let descr = spectrum.description().clone();
let t = MultiLayerSpectrum::from_peaks_data_levels_and_description(peaks, descr);
match self.send(t) {
Ok(_) => {Ok(k)},
Err(e) => {
Err(
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
)
},
}
}
fn write_owned<S: SpectrumLike<C, D> + 'static>(
&mut self,
spectrum: S,
) -> std::io::Result<usize> {
let k = spectrum.index();
let (peaks, description) = spectrum.into_peaks_and_description();
let t = MultiLayerSpectrum::from_peaks_data_levels_and_description(peaks, description);
match self.send(t) {
Ok(_) => {Ok(k)},
Err(e) => {
Err(
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
)
},
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
fn close(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl<
C: CentroidLike + Send + BuildArrayMapFrom + BuildFromArrayMap + Clone,
D: DeconvolutedCentroidLike + Send + BuildArrayMapFrom + BuildFromArrayMap + Clone,
> SpectrumWriter<C, D> for SyncSender<MultiLayerSpectrum<C, D>> {
fn write<S: SpectrumLike<C, D> + 'static>(&mut self, spectrum: &S) -> std::io::Result<usize> {
let k = spectrum.index();
let peaks = spectrum.peaks().cloned();
let descr = spectrum.description().clone();
let t = MultiLayerSpectrum::from_peaks_data_levels_and_description(peaks, descr);
match self.send(t) {
Ok(_) => {Ok(k)},
Err(e) => {
Err(
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
)
},
}
}
fn write_owned<S: SpectrumLike<C, D> + 'static>(
&mut self,
spectrum: S,
) -> std::io::Result<usize> {
let k = spectrum.index();
let (peaks, description) = spectrum.into_peaks_and_description();
let t = MultiLayerSpectrum::from_peaks_data_levels_and_description(peaks, description);
match self.send(t) {
Ok(_) => {Ok(k)},
Err(e) => {
Err(
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
)
},
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
fn close(&mut self) -> std::io::Result<()> {
Ok(())
}
}