use crate::averager::{AveragedFrame, FrameToAverage};
use crate::dispatcher::BubbleError::AlreadyProcessed;
use crate::dispatcher::Task::NotYet;
use crate::utils::{
py_stamp, Compressor, Decompressor, EntryProcesser, MultiColumnWalker, Resizer, Texter,
WalkResult, KEY_DONT_TOUCH, KEY_DUBBLE_MONITOR, KEY_DUBBLE_PHOTO,
};
use crate::{notifier, ChDispatcher, IType};
use crossbeam_channel::{unbounded, Receiver, Sender};
use cryiorust::cbf::CbfFrame;
use cryiorust::edf::{DataType, EdfFrame};
use cryiorust::frame::{self, Frame, FrameError, Header, HeaderEntry};
use cryiorust::{cbf, edf};
use integrustio::distortion::Distortion;
use integrustio::integrator::{
Diffractogram, Integrable, IntegrationType, Integrator, Pattern, PatternType, Units,
};
use integrustio::poni::Poni;
use integrustio::spline::Spline;
use itertools::Itertools;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use serde;
use serde::export::Formatter;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::fs::{metadata, File, ReadDir};
use std::io::{self, BufWriter, Cursor, Write};
#[cfg(not(target_os = "windows"))]
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Arc;
use std::{fmt, fs, mem, thread};
#[derive(serde::Deserialize)]
pub struct Settings {
#[serde(default)]
run: i32,
#[serde(default)]
stop: i32,
#[serde(default)]
path: Option<String>,
#[serde(default)]
poni: Option<String>,
#[serde(default)]
subdir: Option<String>,
#[serde(default, rename = "solidangle")]
solid_angle: Option<bool>,
#[serde(default)]
polarization: Option<f64>,
#[serde(default)]
calibration: Option<f64>,
#[serde(default)]
units: Option<String>,
#[serde(default)]
normalization: Option<String>,
#[serde(default, rename = "normrange")]
norm_range: Option<Vec<f64>>,
#[serde(default)]
beamline: Option<String>,
#[serde(default, rename = "bins")]
radial_bins: Option<usize>,
#[serde(default, rename = "abins")]
azimuthal_bins: Option<usize>,
#[serde(default)]
thickness: Option<f64>,
#[serde(default)]
concentration: Option<f64>,
#[serde(default, rename = "ext")]
extension: Option<String>,
#[serde(default)]
speed: Option<bool>,
#[serde(default)]
super_speed: Option<bool>,
#[serde(default, rename = "bkgCoef")]
bkg_coef: Option<f64>,
#[serde(default, rename = "backgroundFiles")]
bkg_names: Option<Vec<String>>,
#[serde(default)]
detector: Option<String>,
#[serde(default, rename = "darkBackground")]
dark_bkg_names: Option<Vec<String>>,
#[serde(default, rename = "darkSample")]
dark_sample_names: Option<Vec<String>>,
#[serde(default)]
spline: Option<String>,
#[serde(default, rename = "maskFile")]
mask: Option<String>,
#[serde(default)]
flood: Option<String>,
#[serde(default, rename = "save")]
save_edf: Option<bool>,
#[serde(default)]
_incidence: Option<bool>,
#[serde(default, rename = "multicolumnName")]
multi_column: Option<String>,
#[serde(default, rename = "azimuthChecked")]
use_azimuth: Option<bool>,
#[serde(default)]
azimuth: Option<Vec<f64>>,
#[serde(default, rename = "useRadial")]
use_radial: Option<bool>,
#[serde(default)]
radial: Option<Vec<f64>>,
#[serde(default, rename = "azimuthSlices")]
azimuth_slices: Option<bool>,
#[serde(default, rename = "i_azimuth")]
azimuthal: Option<bool>,
#[serde(default, rename = "i_cake")]
cake: Option<bool>,
#[serde(default, rename = "averageNFrames")]
average: Option<usize>,
}
pub struct ISettings {
pub itype: IType,
pub chan: Sender<Response>,
pub settings: Settings,
}
pub trait Dispatcher {
fn run(self, itype: IType);
}
impl Dispatcher for ChDispatcher {
fn run(self, itype: IType) {
debug!("Dispatcher for {} integration has started", itype);
let mut dispatcher = Dispatch::new(itype);
let mut results = None;
let (start, stop, event) = (
dispatcher.ch_recv_start.clone(),
dispatcher.ch_recv_stop.clone(),
dispatcher.ch_send_event.clone(),
);
thread::spawn(move || notifier::run(start, stop, event));
loop {
select! {
recv(dispatcher.ch_recv_results) -> msg => {
if let Ok(r) = msg {
results = Some(r);
dispatcher.done += 1;
}
}
recv(self.ch_recv_settings) -> msg => {
if let Ok(s) = msg {
if let Some(state) = dispatcher.parse_settings(s.settings, &s.itype) {
let _ = s.chan.send(Response {
state,
results,
done: dispatcher.done,
total: dispatcher.total,
});
results = None;
}
}
}
recv(self.ch_wait_req_state) -> msg => {
if let Ok(ch) = msg {
let _ = ch.send(Response {
state: dispatcher.state(),
results,
done: dispatcher.done,
total: dispatcher.total,
});
results = None;
}
}
recv(dispatcher.ch_recv_n) -> msg => {
if let Ok(n) = msg {
dispatcher.total += n;
}
}
recv(dispatcher.ch_recv_event) -> msg => {
if let Ok(path) = msg {
dispatcher.process_event(path);
}
}
}
}
}
}
#[derive(PartialEq)]
enum Detector {
Pilatus,
Frelon,
}
impl Detector {
fn is_frelon(&self) -> bool {
self == &Detector::Frelon
}
}
struct Dispatch {
integration: Arc<RwLock<Integration>>,
total: isize,
done: isize,
ch_recv_results: Receiver<Results>,
ch_recv_n: Receiver<isize>,
ch_send_start: Sender<String>,
ch_recv_start: Receiver<String>,
ch_send_stop: Sender<Arc<String>>,
ch_recv_stop: Receiver<Arc<String>>,
ch_send_event: Sender<PathBuf>,
ch_recv_event: Receiver<PathBuf>,
dict: HashMap<Rc<Path>, HashMap<Rc<OsStr>, Vec<PathBuf>>>,
average: usize,
}
impl Dispatch {
fn new(itype: IType) -> Dispatch {
let (ch_send_results, ch_recv_results) = unbounded();
let (ch_send_n, ch_recv_n) = unbounded();
let (ch_send_start, ch_recv_start) = unbounded();
let (ch_send_stop, ch_recv_stop) = unbounded();
let (ch_send_event, ch_recv_event) = unbounded();
Dispatch {
integration: Arc::new(RwLock::new(Integration::new(
itype,
ch_send_results,
ch_send_n,
))),
total: 0,
done: 0,
ch_recv_results,
ch_recv_n,
ch_send_start,
ch_recv_start,
ch_send_stop,
ch_recv_stop,
ch_send_event,
ch_recv_event,
dict: HashMap::new(),
average: 0,
}
}
fn process_event(&mut self, path: PathBuf) {
self.total += 1;
if let Some(parent) = path.parent() {
let parent: Rc<Path> = Rc::from(parent);
let dict = self.dict.entry(parent).or_insert(HashMap::new());
let task = dict.accumulate(path, self.average);
if task.is_done() {
let i = self.integration.clone();
rayon::spawn(move || task.integrate(i));
}
}
}
fn parse_settings(&mut self, s: Settings, itype: &IType) -> Option<DispatcherState> {
let was_running = {
let i = self.integration.read();
if i.itype != *itype {
return None;
}
i.running
};
let (errors, warnings, running) = {
let mut i = self.integration.write();
let (errors, warnings) = i.parse_settings(s);
self.average = i.average;
(errors, warnings, i.is_running())
};
if running {
if !was_running {
self.dict.clear();
self.total = 0;
self.done = 0;
let path = &self.integration.read().path;
let _ = self.ch_send_start.send(path.clone());
}
let i = self.integration.clone();
thread::spawn(move || i.walker());
} else {
if was_running {
let integration = self.integration.read();
let path = Arc::new(integration.path.clone());
let _ = self.ch_send_stop.send(path.clone());
if !integration.multi_column.is_empty() {
let name = integration.multi_column.clone();
let ext = integration.extension.clone();
rayon::spawn(move || path.as_ref().walk(name, ext));
}
}
}
Some(DispatcherState {
integration: self.integration.clone(),
errors,
warnings,
})
}
fn state(&self) -> DispatcherState {
DispatcherState {
integration: self.integration.clone(),
errors: vec![],
warnings: vec![],
}
}
}
pub struct DispatcherState {
pub integration: Arc<RwLock<Integration>>,
pub errors: Vec<String>,
pub warnings: Vec<String>,
}
pub struct Response {
pub state: DispatcherState,
pub results: Option<Results>,
pub total: isize,
pub done: isize,
}
pub struct Results {
pub path: Arc<PathBuf>,
pub name: PathBuf,
diffractogram: Option<Diffractogram>,
pub timestamp: f64,
pub transmission: f64,
subdir: Arc<PathBuf>,
ext: Arc<String>,
save_edf: bool,
azimuth: Option<[f64; 2]>,
}
pub struct Integration {
i: Integrator,
subdir: Arc<PathBuf>,
itype: IType,
running: bool,
path: String,
ch_send_results: Sender<Results>,
ch_send_n: Sender<isize>,
beamline: Beamline,
thickness: f64,
concentration: f64,
extension: Arc<String>,
speed: bool,
super_speed: bool,
detector: Detector,
dark_bkg: Option<AveragedFrame>,
dark_sample: Option<AveragedFrame>,
spline: Option<Spline>,
d: Distortion,
r_bins: usize,
a_bins: usize,
save_edf: bool,
flood: Option<Box<dyn Frame>>,
norm: Normalization,
norm_range: [f64; 2],
calibration: f64,
bkg: Option<AveragedFrame>,
multi_column: Arc<String>,
azimuth: [f64; 2],
user_azimuth: [f64; 2],
radial: [f64; 2],
azimuth_slices: bool,
azimuthal: bool,
cake: bool,
mask: Option<Vec<u8>>,
average: usize,
}
pub enum Beamline {
None,
Dubble,
SNBL,
}
impl Integration {
fn new(
itype: IType,
ch_send_results: Sender<Results>,
ch_send_n: Sender<isize>,
) -> Integration {
Integration {
itype,
running: false,
path: String::new(),
ch_send_results,
i: Integrator::new(),
subdir: Arc::new(PathBuf::new()),
ch_send_n,
beamline: Beamline::None,
thickness: 0.0,
concentration: 0.0,
extension: Arc::new("dat".to_string()),
speed: false,
super_speed: false,
detector: Detector::Pilatus,
dark_bkg: None,
dark_sample: None,
spline: None,
d: Distortion::new(),
r_bins: 0,
a_bins: 0,
save_edf: false,
flood: None,
norm: Normalization::None,
norm_range: [0., 0.],
calibration: 1.0,
bkg: None,
multi_column: Arc::new(String::new()),
azimuth: [0., 0.],
user_azimuth: [0., 0.],
radial: [0., 0.],
azimuth_slices: false,
azimuthal: false,
cake: false,
mask: None,
average: 0,
}
}
pub fn itype(&self) -> &IType {
&self.itype
}
pub fn path(&self) -> &str {
&self.path
}
fn parse_settings(&mut self, s: Settings) -> (Vec<String>, Vec<String>) {
let mut errors = vec![];
let mut warnings = vec![];
if let Some(err) = self.set_path(s.path, s.subdir) {
errors.push(err);
}
if let Some(err) = self.set_poni(s.poni) {
errors.push(err);
}
self.set_solid_angle(s.solid_angle);
self.set_units(s.units);
self.set_polarization(s.polarization);
self.set_calibration(s.calibration);
self.set_beamline(s.beamline);
self.set_radial_bins(s.radial_bins);
self.set_azimuthal_bins(s.azimuthal_bins);
self.set_thickness(s.thickness);
self.set_average(s.average);
self.set_concentration(s.concentration);
self.set_extension(s.extension);
self.set_speed(s.speed);
self.set_super_speed(s.super_speed);
self.set_save_edf(s.save_edf);
self.set_multi_column(s.multi_column);
self.set_azimuth_slices(s.azimuth_slices);
self.set_azimuth(s.azimuth, s.use_azimuth);
self.set_radial(s.radial, s.use_radial);
self.set_azimuthal_integration(s.azimuthal);
self.set_cake(s.cake);
if let Some(err) = self.set_normalization(s.normalization, s.norm_range) {
warnings.push(err);
}
if let Some(err) = self.set_mask(s.mask) {
warnings.push(err);
}
if let Some(err) = self.set_detector(s.detector) {
errors.push(err);
}
if let Some(err) = self.parse_spline(s.spline) {
warnings.push(err);
}
if let Some(mut errs) = self.set_dark_background(s.dark_bkg_names) {
warnings.append(&mut errs);
}
if let Some(mut errs) = self.set_dark_sample(s.dark_sample_names) {
warnings.append(&mut errs);
}
if let Some(err) = self.set_flood(s.flood) {
warnings.push(err);
}
if let Some(mut errs) = self.set_background(s.bkg_coef, s.bkg_names) {
warnings.append(&mut errs);
}
if s.run != 0 && errors.is_empty() {
self.run();
}
if s.stop != 0 {
self.stop();
}
(warnings, errors)
}
fn convert_azimuth(&self, azimuth: &[f64; 2]) -> [f64; 2] {
let mut out = match self.beamline {
Beamline::SNBL => [-azimuth[1], -azimuth[0]],
Beamline::Dubble => match self.itype {
IType::SAXS => [azimuth[0], azimuth[1]],
IType::WAXS => [azimuth[0] - 180., azimuth[1] - 180.],
},
Beamline::None => [azimuth[0], azimuth[1]],
};
while out[0] < 0. {
out[0] += 360.;
}
while out[0] >= 360. {
out[0] -= 360.;
}
while out[1] <= 0. {
out[1] += 360.;
}
while out[1] > 360. {
out[1] -= 360.;
}
out
}
fn set_azimuth(&mut self, azimuth: Option<Vec<f64>>, enable: Option<bool>) {
if let Some(enable) = enable {
if enable {
match azimuth {
None => (),
Some(azimuth) => {
if azimuth.len() < 2 {
self.user_azimuth = [0., 0.];
self.azimuth = self.user_azimuth.clone();
return;
}
self.user_azimuth =
[azimuth[0].min(azimuth[1]), azimuth[1].max(azimuth[0])];
if self.azimuth_slices {
self.azimuth = self.user_azimuth.clone()
} else {
self.azimuth = self.convert_azimuth(&self.user_azimuth);
}
}
}
} else {
self.user_azimuth = [0., 0.];
self.azimuth = self.user_azimuth.clone();
}
}
}
fn set_radial(&mut self, radial: Option<Vec<f64>>, enable: Option<bool>) {
if let Some(enable) = enable {
if enable {
if let Some(radial) = radial {
if radial.len() < 2 {
self.radial = [0., 0.];
return;
}
self.radial[0] = radial[0].min(radial[1]);
self.radial[1] = radial[0].max(radial[1]);
}
} else {
self.radial = [0., 0.];
}
}
}
fn set_multi_column(&mut self, name: Option<String>) {
if let Some(name) = name {
self.multi_column = Arc::new(name);
}
}
fn set_azimuth_slices(&mut self, enable: Option<bool>) {
if let Some(enable) = enable {
self.azimuth_slices = enable;
}
}
fn set_cake(&mut self, enable: Option<bool>) {
if let Some(enable) = enable {
self.cake = enable;
}
}
fn set_azimuthal_integration(&mut self, enable: Option<bool>) {
if let Some(enable) = enable {
self.azimuthal = enable;
}
}
fn set_dark_background(&mut self, names: Option<Vec<String>>) -> Option<Vec<String>> {
let (array, errors) = names.parse_as_dark(&self.detector, "background");
self.dark_bkg = array;
errors
}
fn set_dark_sample(&mut self, names: Option<Vec<String>>) -> Option<Vec<String>> {
let (array, errors) = names.parse_as_dark(&self.detector, "sample");
self.dark_sample = array;
errors
}
fn set_save_edf(&mut self, enable: Option<bool>) {
if let Some(enable) = enable {
self.save_edf = enable;
}
}
fn set_flood(&mut self, flood: Option<String>) -> Option<String> {
if self.detector != Detector::Frelon {
self.flood = None;
return None;
}
if let Some(flood) = flood {
if flood.is_empty() {
self.flood = None;
return None;
}
let path = PathBuf::from(&flood);
match path.open(false) {
Ok(frame) => self.flood = Some(frame),
Err(e) => return Some(format!("Failed to open flood {}: {}", flood, e)),
}
}
None
}
fn set_background(
&mut self,
coefficient: Option<f64>,
names: Option<Vec<String>>,
) -> Option<Vec<String>> {
let mut errors = vec![];
let mut frames = vec![];
let coefficient = match coefficient {
Some(coefficient) => coefficient,
None => 1.0,
};
let names = match names {
Some(names) => {
if names.is_empty() {
self.bkg = None;
return None;
}
names
}
None => return None,
};
for name in names.iter() {
let path = PathBuf::from(name);
match path.open(false) {
Ok(frame) => frames.push(self.normalize_bkg_frame(name, frame, &mut errors)),
Err(e) => errors.push(format!("{}: {}", name, e)),
}
}
if frames.is_empty() {
self.bkg = None;
} else {
let mut af = AveragedFrame::new(&frames, coefficient);
info!(
"Background: {} files have been averaged, transmission = {}, monitor = {}",
af.done, af.transmission, af.monitor,
);
errors.append(&mut af.errors);
self.bkg = if af.done > 0 { Some(af) } else { None };
}
if errors.is_empty() {
None
} else {
Some(errors)
}
}
fn normalize_bkg_frame<'a, 'b>(
&'b self,
name: &'a str,
mut frame: Box<dyn Frame>,
errors: &mut Vec<String>,
) -> FrameToAverage<'a> {
if let Some(dark) = self.dark_bkg.as_ref() {
let mut image = frame.array_mut();
if &dark.array == image {
image -= &dark.array;
} else {
errors.push("Dark and background dimensions are inconsistent".to_string());
}
}
let (monitor, transmission) = self.beamline.monitor_and_transmission(frame.as_ref());
if monitor > 0. && self.norm == Normalization::Monitor {
let mut image = frame.array_mut();
image /= monitor;
}
FrameToAverage {
frame,
name,
transmission,
monitor,
photo: 0.0,
}
}
fn set_radial_bins(&mut self, bins: Option<usize>) {
if let Some(bins) = bins {
if bins != self.r_bins {
self.i.set_radial_bins(bins);
self.r_bins = bins;
}
}
}
fn set_speed(&mut self, speed: Option<bool>) {
if let Some(speed) = speed {
self.speed = speed;
}
}
fn set_super_speed(&mut self, super_speed: Option<bool>) {
if let Some(super_speed) = super_speed {
self.super_speed = super_speed;
}
}
fn set_detector(&mut self, detector: Option<String>) -> Option<String> {
if let Some(detector) = detector {
self.detector = match detector.as_str() {
"" | "Pilatus" => Detector::Pilatus,
"Frelon" => Detector::Frelon,
_ => return Some(format!("Detector {} is not recognized", detector)),
}
}
None
}
fn set_extension(&mut self, extension: Option<String>) {
if let Some(mut extension) = extension {
if extension.is_empty() {
self.extension = Arc::new("dat".to_string());
} else {
if extension.starts_with('.') {
extension.remove(0);
}
self.extension = Arc::new(extension);
}
}
}
fn set_thickness(&mut self, thickness: Option<f64>) {
if let Some(thickness) = thickness {
self.thickness = thickness;
}
}
fn set_average(&mut self, average: Option<usize>) {
if let Some(average) = average {
self.average = average;
}
}
fn set_concentration(&mut self, concentration: Option<f64>) {
if let Some(concentration) = concentration {
self.concentration = concentration;
}
}
fn set_azimuthal_bins(&mut self, bins: Option<usize>) {
if let Some(bins) = bins {
if bins != self.a_bins {
self.i.set_azimuthal_bins(bins);
self.a_bins = bins;
}
}
}
fn set_beamline(&mut self, beamline: Option<String>) {
if let Some(beamline) = beamline {
self.beamline = match beamline.as_str() {
"Dubble" => Beamline::Dubble,
"SNBL" => Beamline::SNBL,
"None" => Beamline::None,
_ => return,
};
}
}
fn set_normalization(&mut self, norm: Option<String>, rng: Option<Vec<f64>>) -> Option<String> {
self.norm = match norm {
Some(normalization) => match normalization.as_ref() {
"Monitor" => Normalization::Monitor,
"Bkg" | "Background" => Normalization::Background,
"Median" => Normalization::Median,
_ => self.norm,
},
None => self.norm,
};
if let Some(range) = rng {
if range.len() < 2 {
return Some(format!(
"Not enough values in normalization range {:?}",
range
));
}
self.norm_range = [range[0], range[1]]
}
None
}
fn set_calibration(&mut self, factor: Option<f64>) {
if let Some(factor) = factor {
self.calibration = factor;
}
}
fn set_polarization(&mut self, pol_fac: Option<f64>) {
if let Some(pol_fac) = pol_fac {
self.i.set_polarization(pol_fac)
}
}
fn set_solid_angle(&mut self, sa: Option<bool>) {
if let Some(sa) = sa {
self.i.set_solid_angle(sa);
}
}
fn set_units(&mut self, units: Option<String>) {
if let Some(units) = units {
let units = match units.as_ref() {
"t" => Units::TwoTheta,
"q" => Units::Qnm,
"a" => Units::QA,
_ => Units::TwoTheta,
};
self.i.set_units(units);
}
}
fn set_running(&mut self, running: bool) {
self.running = running;
}
fn set_path(&mut self, path: Option<String>, subdir: Option<String>) -> Option<String> {
if let Some(path) = path {
if path.is_empty() {
return Some("Path cannot be empty".to_string());
}
let md = match metadata(&path) {
Ok(md) => md,
Err(e) => return Some(format!("Failed to get path metadata {}: {}", path, e)),
};
if !md.is_dir() {
return Some(format!("Path '{}' is not a directory", path));
}
self.path = path;
if let Some(subdir) = subdir {
self.subdir = Arc::new(PathBuf::from(subdir));
}
}
None
}
fn run(&mut self) {
self.set_running(true);
}
fn stop(&mut self) {
self.set_running(false);
}
pub fn is_running(&self) -> bool {
self.running
}
fn set_mask(&mut self, mask: Option<String>) -> Option<String> {
if let Some(mask) = mask {
if mask.is_empty() {
self.mask = None;
} else {
match mask.decompress() {
Ok(mask) => self.mask = Some(mask),
Err(_) => {
self.mask = None;
return Some(String::from("Opening masks by name not yet implemented"));
}
}
}
}
None
}
fn set_poni(&mut self, poni: Option<String>) -> Option<String> {
if let Some(poni) = poni {
if poni.is_empty() {
return Some(format!("Poni file must be specified"));
}
match poni.decompress() {
Ok(poni) => {
let poni: &[u8] = poni.as_ref();
match Poni::read_buffer(io::BufReader::new(poni)) {
Ok(poni) => self.i.set_poni(poni),
Err(e) => return Some(format!("Could not read poni: {}", e)),
}
}
Err(_) => match Poni::read_file(&poni) {
Ok(poni) => self.i.set_poni(poni),
Err(e) => return Some(format!("Could not read poni: {}", e)),
},
};
}
None
}
fn parse_spline(&mut self, spline: Option<String>) -> Option<String> {
if self.detector != Detector::Frelon {
self.spline = None;
return None;
}
if let Some(spline) = spline {
if spline.is_empty() {
self.spline = None;
return None;
}
match spline.decompress() {
Ok(spline) => {
let spline: &[u8] = spline.as_ref();
match Spline::parse(io::BufReader::new(spline)) {
Ok(spline) => self.set_spline(spline),
Err(e) => return Some(format!("Could not read spline: {}", e)),
}
}
Err(_) => match Spline::open(&spline) {
Ok(spline) => self.set_spline(spline),
Err(e) => return Some(format!("Could not read spline: {}", e)),
},
};
}
None
}
fn set_spline(&mut self, spline: Spline) {
let spline = Some(spline);
if self.spline != spline {
self.spline = spline;
self.d = Distortion::new();
}
}
pub fn speed(&self) -> bool {
self.speed
}
pub fn super_speed(&self) -> bool {
self.super_speed
}
fn is_correctable(&self) -> bool {
self.detector.is_frelon() && self.spline.is_some()
}
fn correct_source(&self, frame: &mut dyn Frame) -> Option<f64> {
let (monitor, mut transmission) = self.beamline.monitor_and_transmission(frame);
{
let mut array = frame.array_mut();
if let Some(dark) = self.dark_sample.as_ref() {
if &dark.array == array {
array -= &dark.array
}
}
if let Some(flood) = self.flood.as_ref() {
let flood = flood.array();
if flood == array {
array /= flood;
}
}
}
if self.is_correctable() {
match self.d.correct(frame) {
Some(corrected) => frame.set_array(corrected),
None => return None,
}
}
let mut array = frame.array_mut();
if self.norm == Normalization::Monitor && monitor > 0. {
array /= monitor;
}
if let Some(bkg) = self.bkg.as_ref() {
if &bkg.array == array {
if transmission == 0. || self.thickness == 0. || bkg.transmission == 0. {
array -= &bkg.array;
} else {
let t = transmission / bkg.transmission;
let m1 = (1. - self.concentration) * t;
let m2 = transmission * self.thickness;
transmission = t;
for (int, bkg) in array.data_mut().iter_mut().zip(bkg.array.data().iter()) {
*int = (*int - *bkg * m1) / m2;
}
}
}
}
if self.norm != Normalization::Background {
array *= self.calibration;
}
if let Some(mask) = self.mask.as_ref() {
if mask.len() == array.dim1() * array.dim2() / 8 + 1 {
for (j, val) in array.data_mut().iter_mut().enumerate() {
let mask_byte = unsafe { mask.get_unchecked(j / 8) };
if (mask_byte >> (j % 8) as u8) & 1 != 0 {
*val = -1.0;
}
}
}
}
Some(transmission)
}
fn normalize_on_bkg(&self, p: &mut Pattern) {
if self.norm == Normalization::Background {
let mut sum = 0.;
for (pos, int) in p.positions.iter().zip(&p.intensity) {
if self.norm_range[0] == self.norm_range[1]
|| (*pos >= self.norm_range[0] && *pos < self.norm_range[1])
{
sum += *int;
}
}
if sum != 0. {
for (int, sigma) in p.intensity.iter_mut().zip(p.sigma.iter_mut()) {
*int *= self.calibration / sum;
*sigma = int.sqrt();
}
}
}
}
fn correct_results(&self, d: &mut Diffractogram) {
match &mut d.data {
PatternType::Azimuthal(ref mut pattern) => {
self.normalize_on_bkg(pattern);
let azimuth = [d.azimuth_min, d.azimuth_max];
match self.beamline {
Beamline::SNBL => pattern.azimuthal_snbl(&azimuth, &self.user_azimuth),
Beamline::Dubble => match self.itype {
IType::SAXS => pattern.azimuthal_dubble_saxs(&azimuth, &self.user_azimuth),
IType::WAXS => pattern.azimuthal_dubble_waxs(&azimuth, &self.user_azimuth),
},
Beamline::None => {}
}
}
PatternType::Radial(ref mut pattern) => {
self.normalize_on_bkg(pattern);
let radial = [d.radial_min, d.radial_max];
pattern.radial(&radial, &radial);
}
_ => {}
}
}
fn init(&mut self, mut frame: Box<dyn Frame>) -> Vec<Results> {
let frame = frame.as_mut();
debug!(
"Initializing integrator for {}x{}",
frame.dim1(),
frame.dim2()
);
self.i.init(frame);
if !self.d.is_initialized(frame) {
if let Some(spline) = self.spline.as_ref() {
debug!(
"Initializing distortion for {}x{}",
frame.dim1(),
frame.dim2()
);
let (spline_x, spline_y) = spline.calculate(frame);
self.d.init(frame, &spline_x, &spline_y);
}
}
self.integrate(frame).unwrap()
}
fn integrate_one(&self, frame: &dyn Frame, it: IntegrationType) -> Option<Results> {
let data = Integrable {
frame,
radial_range: &self.radial,
azimuthal_range: &self.azimuth,
integration_type: it,
};
if let Some(d) = self.i.integrate(&data) {
Some(Results {
path: Default::default(),
name: Default::default(),
diffractogram: Some(d),
timestamp: py_stamp(),
transmission: 0.,
subdir: self.subdir.clone(),
ext: self.extension.clone(),
save_edf: self.save_edf,
azimuth: None,
})
} else {
None
}
}
fn integrate_azimuthal_slices(&self, frame: &dyn Frame) -> Option<Vec<Results>> {
let mut patterns = vec![];
let mut range = [self.azimuth[0], self.azimuth[0] + self.azimuth[1]];
while range[0] < 360. {
let data = Integrable {
frame,
radial_range: &self.radial,
azimuthal_range: &range,
integration_type: IntegrationType::Radial,
};
if let Some(d) = self.i.integrate(&data) {
patterns.push(Results {
path: Default::default(),
name: Default::default(),
diffractogram: Some(d),
timestamp: py_stamp(),
transmission: 0.,
save_edf: self.save_edf,
ext: self.extension.clone(),
azimuth: Some(self.convert_azimuth(&range)),
subdir: self.subdir.clone(),
});
range[0] += self.azimuth[1];
range[1] = range[0] + self.azimuth[1];
} else {
return None;
}
}
Some(patterns)
}
fn integrate(&self, frame: &mut dyn Frame) -> Option<Vec<Results>> {
let transmission = match self.correct_source(frame) {
Some(t) => t,
None => return None,
};
let mut res = vec![];
if self.azimuth_slices && self.azimuth[1] > 0. {
if let Some(ref mut d) = self.integrate_azimuthal_slices(frame) {
res.append(d);
} else {
return None;
}
} else {
if let Some(d) = self.integrate_one(frame, IntegrationType::Radial) {
res.push(d);
} else {
return None;
}
}
if self.azimuthal {
if let Some(d) = self.integrate_one(frame, IntegrationType::Azimuthal) {
res.push(d);
} else {
return None;
}
}
if self.cake {
if let Some(d) = self.integrate_one(frame, IntegrationType::Cake) {
res.push(d);
} else {
return None;
}
}
for r in &mut res {
self.correct_results(&mut r.diffractogram.as_mut().unwrap());
r.transmission = transmission;
}
Some(res)
}
}
trait Walker {
fn walker(self);
fn walk(self, paths: io::Result<ReadDir>);
}
impl Walker for Arc<RwLock<Integration>> {
fn walker(self) {
let paths = { fs::read_dir(&self.read().path) };
self.walk(paths);
}
fn walk(self, paths: io::Result<ReadDir>) {
match paths {
Ok(paths) => paths.read(self),
Err(e) => error!("Could not read dir: {}", e),
}
}
}
trait DirReader {
fn read(self, integration: Arc<RwLock<Integration>>);
}
impl DirReader for ReadDir {
fn read(self, integration: Arc<RwLock<Integration>>) {
let mut dict = HashMap::new();
let mut paths = self.filter_map(|r| r.ok()).collect_vec();
paths.sort_by_key(|e| e.path());
for path in paths {
if !integration.read().is_running() {
return;
}
match path.process() {
WalkResult::None(e) => warn!("Could not process entry {:?}: {}", path, e),
WalkResult::Dir(path) => integration.clone().walk(fs::read_dir(&path)),
WalkResult::File(path) => {
let _ = integration.read().ch_send_n.send(1);
let task = dict.accumulate(path, integration.read().average);
if task.is_done() {
let i = integration.clone();
rayon::spawn(move || task.integrate(i));
}
}
}
}
}
}
trait PathAccumulator {
fn accumulate(&mut self, path: PathBuf, average: usize) -> Task;
}
impl PathAccumulator for HashMap<Rc<OsStr>, Vec<PathBuf>> {
fn accumulate(&mut self, path: PathBuf, average: usize) -> Task {
if average < 2 {
return Task::One(path);
}
if let Some(ext) = path.extension() {
let key: Rc<OsStr> = Rc::from(ext);
let paths = self.entry(key).or_insert(vec![]);
return if paths.len() < average {
paths.push(path);
Task::NotYet
} else {
let tasks = paths.clone();
paths.clear();
Task::Many(tasks)
};
}
Task::NotYet
}
}
trait FrameReader {
fn read(&self) -> Option<Box<dyn Frame>>;
}
enum BubbleError {
FrameError(FrameError),
AlreadyProcessed,
}
impl fmt::Display for BubbleError {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), fmt::Error> {
match self {
BubbleError::FrameError(fe) => fe.fmt(f),
AlreadyProcessed => write!(f, "frame already processed"),
}
}
}
type BubbleResult<T> = Result<T, BubbleError>;
trait SingleFrameReader: AsRef<Path> + fmt::Debug {
fn open(&self, skip: bool) -> BubbleResult<Box<dyn Frame>> {
match frame::open(self) {
Ok(frame) => {
if skip && frame.header().contains_key(KEY_DONT_TOUCH) {
Err(AlreadyProcessed)
} else {
Ok(frame)
}
}
Err(e) => Err(BubbleError::FrameError(e)),
}
}
fn read(&self) -> Option<Box<dyn Frame>> {
match self.open(true) {
Ok(frame) => Some(frame),
Err(e) => {
debug!("Skipping {:?} due to error: {}", self, e);
None
}
}
}
}
impl SingleFrameReader for PathBuf {}
impl FrameReader for [PathBuf] {
fn read(&self) -> Option<Box<dyn Frame>> {
let mut frames = vec![];
debug!(
"Averaging {}",
self.iter()
.map(|e| e.as_os_str().to_str().unwrap())
.join(" ")
);
for path in self {
if let Some(name) = path.as_os_str().to_str() {
if let Some(frame) = path.read() {
frames.push(FrameToAverage::new(frame, name));
}
}
}
if frames.is_empty() {
None
} else {
Some(Box::new(AveragedFrame::new(&frames, 1.)))
}
}
}
#[derive(PartialEq)]
enum Task {
NotYet,
One(PathBuf),
Many(Vec<PathBuf>),
}
impl Task {
fn is_not_yet(&self) -> bool {
self == &Task::NotYet
}
fn is_done(&self) -> bool {
!self.is_not_yet()
}
fn integrate(self, i: Arc<RwLock<Integration>>) {
if !i.read().is_running() {
return;
}
let (frame, path) = match self {
Task::One(path) => (path.read(), path),
Task::Many(mut paths) => (paths.read(), paths.remove(paths.len() - 1)),
NotYet => unreachable!("Task::integrate must be never called with Task::NotYet!"),
};
info!("Processing {:?}", path);
match frame {
Some(mut frame) => {
let results = {
let i = i.upgradable_read();
match i.integrate(frame.as_mut()) {
Some(results) => results,
None => RwLockUpgradableReadGuard::upgrade(i).init(frame),
}
};
let path = Arc::new(path);
for mut result in results {
result.path = path.clone();
result.save();
let _ = i.read().ch_send_results.send(result);
}
}
None => {
info!("Failed to open as frame: {:?}", path);
let _ = i.read().ch_send_n.send(-1);
}
}
}
}
#[derive(PartialEq, Clone, Copy)]
enum Normalization {
None,
Monitor,
Background,
Median,
}
trait DarkParser {
fn parse_as_dark(
&self,
detector: &Detector,
msg: &str,
) -> (Option<AveragedFrame>, Option<Vec<String>>);
}
impl DarkParser for Option<Vec<String>> {
fn parse_as_dark(
&self,
detector: &Detector,
msg: &str,
) -> (Option<AveragedFrame>, Option<Vec<String>>) {
let mut errors = vec![];
let mut a = None;
match detector {
Detector::Frelon => {
let names = match self {
Some(names) => {
if names.is_empty() {
return (None, None);
}
names
}
None => return (None, None),
};
let mut frames = vec![];
for name in names {
let path = PathBuf::from(name);
match path.open(false) {
Ok(frame) => frames.push(FrameToAverage::new(frame, name)),
Err(e) => errors.push(format!("{}: {}", name, e)),
}
}
let mut af = AveragedFrame::new(&frames, 1.);
info!("Dark {}: {} files have been averaged", msg, af.done);
errors.append(&mut af.errors);
if af.done > 0 {
a = Some(af)
}
}
_ => {}
}
if errors.is_empty() {
(a, None)
} else {
(a, Some(errors))
}
}
}
trait PostProcessor {
fn recalculate(&mut self, range: &[f64; 2], user_range: &[f64; 2], inverted: bool);
fn radial(&mut self, range: &[f64; 2], user_range: &[f64; 2]);
fn azimuthal_snbl(&mut self, range: &[f64; 2], user_range: &[f64; 2]);
fn azimuthal_dubble_saxs(&mut self, range: &[f64; 2], user_range: &[f64; 2]);
fn azimuthal_dubble_waxs(&mut self, range: &[f64; 2], user_range: &[f64; 2]);
}
impl PostProcessor for Pattern {
fn recalculate(&mut self, range: &[f64; 2], user_range: &[f64; 2], inverted: bool) {
if range[0] == range[1] {
return;
}
let (mut min, mut max) = (0, 0);
for (i, p) in self.positions.iter().enumerate() {
if min == 0 && range[0] <= *p {
min = i;
}
if max == 0 && range[1] <= *p {
max = i;
}
if min != 0 && max != 0 {
break;
}
}
if inverted {
min += 1;
} else if max > 2 {
max -= 1;
}
let mut positions = self.positions.as_ref().clone();
positions.drain_garbage(min, max);
if *range != *user_range {
let mut pos = user_range[0];
let step = (user_range[1] - pos) / positions.len() as f64;
for val in &mut positions {
*val = pos;
pos += step;
}
}
self.intensity.drain_garbage(min, max);
self.sigma.drain_garbage(min, max);
self.positions = Arc::new(positions);
}
fn radial(&mut self, range: &[f64; 2], user_range: &[f64; 2]) {
self.recalculate(range, user_range, false);
}
fn azimuthal_snbl(&mut self, range: &[f64; 2], user_range: &[f64; 2]) {
self.recalculate(range, user_range, true);
self.intensity.reverse();
self.sigma.reverse();
}
fn azimuthal_dubble_saxs(&mut self, range: &[f64; 2], user_range: &[f64; 2]) {
self.recalculate(range, user_range, false);
}
fn azimuthal_dubble_waxs(&mut self, range: &[f64; 2], user_range: &[f64; 2]) {
self.recalculate(range, user_range, false);
let k = self.positions.len() / 2;
self.intensity.rotate_right(k);
self.sigma.rotate_right(k);
}
}
trait GarbageDrainer<T> {
fn drain_garbage(&mut self, min: usize, max: usize);
}
impl<T> GarbageDrainer<T> for Vec<T> {
fn drain_garbage(&mut self, min: usize, max: usize) {
self.rotate_left(min);
unsafe { self.set_len(max - min) };
}
}
impl Results {
fn parse_name(&self) -> String {
let fs = self.path.file_stem().unwrap().to_str().unwrap();
match self.diffractogram.as_ref().unwrap().data {
PatternType::Radial(_) => {
if let Some(az) = self.azimuth.as_ref() {
format!("{}_azimuthal_slice_{}_{}.{}", fs, az[0], az[1], self.ext)
} else {
format!("{}.{}", fs, self.ext)
}
}
PatternType::Azimuthal(_) => format!("{}_azimuthal.{}", fs, self.ext),
PatternType::Cake(_) => format!("{}_cake.cbf", fs),
PatternType::None => unimplemented!("PatternType::None is not implemented"),
}
}
fn make_filename(&mut self) {
self.name = self.path.parent().unwrap().to_path_buf();
self.name.push(self.subdir.as_path());
self.name.push(self.parse_name());
}
fn save_normalized_image(&self) {
let mut header = Header::new();
header.insert(KEY_DONT_TOUCH.to_string(), HeaderEntry::Number(1));
let file = match File::create(&self.name) {
Ok(w) => w,
Err(e) => {
error!("Failed to create file {:?}: {}", self.name, e);
return;
}
};
let mut writer = BufWriter::new(file);
if let Err(e) = EdfFrame::save_array(
&self.diffractogram.as_ref().unwrap().image,
&mut header,
&mut writer,
DataType::F64,
) {
error!("Failed to save edf file {:?}: {}", self.name, e);
};
}
fn save(&mut self) {
self.make_filename();
match &self.diffractogram.as_ref().unwrap().data {
PatternType::Radial(p) | PatternType::Azimuthal(p) => {
let dir = self.name.parent().unwrap();
if let Err(e) = fs::create_dir_all(dir) {
error!("Could not create directory {:?}: {}", dir, e);
return;
}
if let Err(e) = dir.set_permission(true) {
warn!("Failed to set permissions for {:?}: {}", self.name, e);
}
let file = match fs::File::create(&self.name) {
Ok(file) => file,
Err(e) => {
error!("Could not create file {:?}: {}", self.name, e);
return;
}
};
if let Err(e) = self.name.set_permission(false) {
warn!("Failed to set permissions for {:?}: {}", self.name, e);
}
let mut writer = BufWriter::new(file);
if self.transmission != 0. {
if let Err(e) =
writeln!(writer, "# Transmission coefficient: {}", self.transmission)
{
error!("Could not write file {:?}: {}", self.name, e);
return;
}
}
if let Err(e) = p.to_text(&mut writer) {
error!("Could not write file {:?}: {}", self.name, e);
}
if self.save_edf {
self.name.set_extension("edf");
self.save_normalized_image();
self.name.set_extension(&self.ext.as_str());
}
}
PatternType::Cake(c) => {
let mut h = Header::new();
h.insert(KEY_DONT_TOUCH.to_owned(), HeaderEntry::Number(1));
let file = match File::create(&self.name) {
Ok(w) => w,
Err(e) => {
error!("Failed to create file {:?}: {}", self.name, e);
return;
}
};
let mut writer = BufWriter::new(file);
let fs = self.name.file_stem().unwrap().to_str().unwrap();
if let Err(e) = CbfFrame::save_array(&c.cake, h, fs, &mut writer) {
error!("Failed to save cake cbf {:?}: {}", self.name, e);
}
}
PatternType::None => {}
}
}
pub fn pack(&self, bin: usize, speed: bool) -> (String, String) {
let mut pattern = String::new();
let mut frame = String::new();
let p = vec![];
let mut buf = Cursor::new(p);
if let Some(d) = &self.diffractogram {
match &d.data {
PatternType::Radial(p) | PatternType::Azimuthal(p) => {
if let Ok(_) = p.to_text(&mut buf) {
pattern = buf.get_ref().compress()
}
}
_ => {}
}
}
if speed {
return (pattern, frame);
}
if let Some(p) = &self.diffractogram {
let mut header = Header::new();
header.insert(KEY_DONT_TOUCH.to_owned(), HeaderEntry::Number(1));
let image = &p.image;
let p = Vec::with_capacity(image.len() * mem::size_of::<f32>());
let mut buf = Cursor::new(p);
let resized = image.resize(bin);
if let Ok(_) = if let Some(resized) = resized {
EdfFrame::save_array(&resized, &mut header, &mut buf, edf::DataType::F32)
} else {
EdfFrame::save_array(image, &mut header, &mut buf, edf::DataType::F32)
} {
frame = buf.get_ref().compress();
}
}
(pattern, frame)
}
}
impl Beamline {
fn monitor_and_transmission(&self, frame: &dyn Frame) -> (f64, f64) {
match self {
Beamline::SNBL => (frame.get_header_float(cbf::KEY_FLUX), 0.),
Beamline::Dubble => {
let mut monitor = frame.get_header_float(KEY_DUBBLE_MONITOR);
if monitor == 0. {
monitor = frame.sum();
}
if monitor == 0. {
(0., 0.)
} else {
let photo = frame.get_header_float(KEY_DUBBLE_PHOTO);
(monitor, photo / monitor)
}
}
Beamline::None => (0., 0.),
}
}
}
trait FilePermission {
fn set_permission(&self, is_dir: bool) -> io::Result<()>;
}
impl<T: AsRef<Path>> FilePermission for T {
#[cfg(not(target_os = "windows"))]
fn set_permission(&self, is_dir: bool) -> io::Result<()> {
if is_dir {
fs::set_permissions(&self, fs::Permissions::from_mode(0o755))
} else {
fs::set_permissions(&self, fs::Permissions::from_mode(0o644))
}
}
#[cfg(target_os = "windows")]
fn set_permission(&self, _is_dir: bool) -> io::Result<()> {
Ok(())
}
}