Skip to main content

ad_core_rs/plugin/
file_controller.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use crate::error::{ADError, ADResult};
5use crate::ndarray::{NDArray, NDDataType, NDDimension};
6
7use super::file_base::{NDFileMode, NDFileWriter, NDPluginFileBase};
8use super::runtime::{
9    ParamChangeResult, ParamChangeValue, ParamUpdate, PluginParamSnapshot, ProcessResult,
10};
11
12/// Param indices for file plugin control (looked up once at registration time).
13#[derive(Default)]
14pub struct FileParamIndices {
15    pub file_path: Option<usize>,
16    pub file_name: Option<usize>,
17    pub file_number: Option<usize>,
18    pub file_template: Option<usize>,
19    pub auto_increment: Option<usize>,
20    pub write_file: Option<usize>,
21    pub read_file: Option<usize>,
22    pub write_mode: Option<usize>,
23    pub num_capture: Option<usize>,
24    pub capture: Option<usize>,
25    pub auto_save: Option<usize>,
26    pub create_dir: Option<usize>,
27    pub file_path_exists: Option<usize>,
28    pub write_status: Option<usize>,
29    pub write_message: Option<usize>,
30    pub full_file_name: Option<usize>,
31    pub file_temp_suffix: Option<usize>,
32    pub num_captured: Option<usize>,
33    pub lazy_open: Option<usize>,
34    pub delete_driver_file: Option<usize>,
35    pub free_capture: Option<usize>,
36}
37
38/// Generic file plugin controller that wraps any NDFileWriter with the full
39/// C ADCore NDPluginFile control-plane logic: auto_save, capture, stream,
40/// temp_suffix rename, create_dir, param updates, error reporting.
41///
42/// Each file format plugin (TIFF, HDF5, JPEG) creates one of these and
43/// delegates `process_array`, `register_params`, and `on_param_change` to it.
44pub struct FilePluginController<W: NDFileWriter> {
45    pub file_base: NDPluginFileBase,
46    pub writer: W,
47    pub params: FileParamIndices,
48    pub auto_save: bool,
49    pub capture_active: bool,
50    pub lazy_open: bool,
51    pub delete_driver_file: bool,
52    pub latest_array: Option<Arc<NDArray>>,
53    /// Recorded dimensions from the first frame in a stream, for validation.
54    stream_dims: Option<Vec<usize>>,
55    /// Recorded data type from the first frame in a stream, for validation.
56    stream_data_type: Option<NDDataType>,
57}
58
59impl<W: NDFileWriter> FilePluginController<W> {
60    pub fn new(writer: W) -> Self {
61        Self {
62            file_base: NDPluginFileBase::new(),
63            writer,
64            params: FileParamIndices::default(),
65            auto_save: false,
66            capture_active: false,
67            lazy_open: false,
68            delete_driver_file: false,
69            latest_array: None,
70            stream_dims: None,
71            stream_data_type: None,
72        }
73    }
74
75    /// Look up all standard file param indices from the port driver base.
76    pub fn register_params(
77        &mut self,
78        base: &mut asyn_rs::port::PortDriverBase,
79    ) -> asyn_rs::error::AsynResult<()> {
80        self.params.file_path = base.find_param("FILE_PATH");
81        self.params.file_name = base.find_param("FILE_NAME");
82        self.params.file_number = base.find_param("FILE_NUMBER");
83        self.params.file_template = base.find_param("FILE_TEMPLATE");
84        self.params.auto_increment = base.find_param("AUTO_INCREMENT");
85        self.params.write_file = base.find_param("WRITE_FILE");
86        self.params.read_file = base.find_param("READ_FILE");
87        self.params.write_mode = base.find_param("WRITE_MODE");
88        self.params.num_capture = base.find_param("NUM_CAPTURE");
89        self.params.capture = base.find_param("CAPTURE");
90        self.params.auto_save = base.find_param("AUTO_SAVE");
91        self.params.create_dir = base.find_param("CREATE_DIR");
92        self.params.file_path_exists = base.find_param("FILE_PATH_EXISTS");
93        self.params.write_status = base.find_param("WRITE_STATUS");
94        self.params.write_message = base.find_param("WRITE_MESSAGE");
95        self.params.full_file_name = base.find_param("FULL_FILE_NAME");
96        self.params.file_temp_suffix = base.find_param("FILE_TEMP_SUFFIX");
97        self.params.num_captured = base.find_param("NUM_CAPTURED");
98        self.params.lazy_open = base.find_param("FILE_LAZY_OPEN");
99        self.params.delete_driver_file = base.find_param("DELETE_DRIVER_FILE");
100        self.params.free_capture = base.find_param("FREE_CAPTURE");
101        Ok(())
102    }
103
104    /// Process an incoming array: auto_save, capture buffering, stream write.
105    pub fn process_array(&mut self, array: &NDArray) -> ProcessResult {
106        let mut proc_result = ProcessResult::empty();
107        let array = Arc::new(array.clone());
108        self.latest_array = Some(array.clone());
109
110        let result = match self.file_base.mode() {
111            NDFileMode::Single => {
112                if self.auto_save {
113                    self.write_single(array)
114                } else {
115                    Ok(())
116                }
117            }
118            NDFileMode::Capture => {
119                if self.capture_active {
120                    self.file_base.capture_array(array);
121                    self.push_num_captured_update(&mut proc_result.param_updates);
122                    if self.file_base.num_captured() >= self.file_base.num_capture_target() {
123                        if self.auto_save {
124                            if let Err(err) = self.file_base.flush_capture(&mut self.writer) {
125                                Err(err)
126                            } else {
127                                self.push_full_file_name_update(&mut proc_result.param_updates);
128                                self.push_num_captured_update(&mut proc_result.param_updates);
129                                self.capture_active = false;
130                                Ok(())
131                            }
132                        } else {
133                            self.capture_active = false;
134                            Ok(())
135                        }
136                    } else {
137                        Ok(())
138                    }
139                } else {
140                    Ok(())
141                }
142            }
143            NDFileMode::Stream => {
144                if self.capture_active {
145                    // Validate frame dimensions and data type against the first frame.
146                    let frame_dims: Vec<usize> = array.dims.iter().map(|d| d.size).collect();
147                    let frame_dtype = array.data.data_type();
148                    if let (Some(expected_dims), Some(expected_dtype)) =
149                        (&self.stream_dims, self.stream_data_type)
150                    {
151                        if &frame_dims != expected_dims || frame_dtype != expected_dtype {
152                            // Mismatched frame: skip silently (C parity behavior).
153                            return proc_result;
154                        }
155                    } else {
156                        // First frame in stream: record dimensions and data type.
157                        self.stream_dims = Some(frame_dims);
158                        self.stream_data_type = Some(frame_dtype);
159                    }
160                    let r = self.file_base.process_array(array, &mut self.writer);
161                    let target = self.file_base.num_capture_target();
162                    if r.is_ok() && target > 0 && self.file_base.num_captured() >= target {
163                        if let Err(e) = self.file_base.close_stream(&mut self.writer) {
164                            return ProcessResult::sink(self.error_updates(
165                                false,
166                                false,
167                                e.to_string(),
168                            ));
169                        }
170                        self.capture_active = false;
171                        self.stream_dims = None;
172                        self.stream_data_type = None;
173                        self.push_full_file_name_update(&mut proc_result.param_updates);
174                        self.push_num_captured_update(&mut proc_result.param_updates);
175                    }
176                    r
177                } else {
178                    Ok(())
179                }
180            }
181        };
182
183        if result.is_ok() {
184            proc_result.param_updates.extend(self.success_updates());
185            if self.file_base.mode() == NDFileMode::Single && self.auto_save {
186                self.push_full_file_name_update(&mut proc_result.param_updates);
187            }
188            if self.file_base.mode() == NDFileMode::Stream && self.capture_active {
189                self.push_full_file_name_update(&mut proc_result.param_updates);
190            }
191        } else if let Err(err) = result {
192            proc_result.param_updates = self.error_updates(false, false, err.to_string());
193        }
194        proc_result
195    }
196
197    /// Handle a control-plane param change. Returns true if the reason was handled.
198    pub fn on_param_change(
199        &mut self,
200        reason: usize,
201        params: &PluginParamSnapshot,
202    ) -> ParamChangeResult {
203        let mut updates = Vec::new();
204
205        if Some(reason) == self.params.file_path {
206            if let ParamChangeValue::Octet(s) = &params.value {
207                let normalized = normalize_file_path(s);
208                self.file_base.file_path = normalized.clone();
209                let exists =
210                    std::path::Path::new(normalized.trim_end_matches(std::path::MAIN_SEPARATOR))
211                        .is_dir();
212                if let Some(idx) = self.params.file_path_exists {
213                    updates.push(ParamUpdate::Int32 {
214                        reason: idx,
215                        addr: 0,
216                        value: if exists { 1 } else { 0 },
217                    });
218                }
219            }
220        } else if Some(reason) == self.params.file_name {
221            if let ParamChangeValue::Octet(s) = &params.value {
222                self.file_base.file_name = s.clone();
223            }
224        } else if Some(reason) == self.params.file_number {
225            self.file_base.file_number = params.value.as_i32();
226        } else if Some(reason) == self.params.file_template {
227            if let ParamChangeValue::Octet(s) = &params.value {
228                self.file_base.file_template = s.clone();
229            }
230        } else if Some(reason) == self.params.auto_increment {
231            self.file_base.auto_increment = params.value.as_i32() != 0;
232        } else if Some(reason) == self.params.auto_save {
233            self.auto_save = params.value.as_i32() != 0;
234        } else if Some(reason) == self.params.write_mode {
235            self.file_base
236                .set_mode(NDFileMode::from_i32(params.value.as_i32()));
237        } else if Some(reason) == self.params.num_capture {
238            self.file_base
239                .set_num_capture(params.value.as_i32().max(1) as usize);
240        } else if Some(reason) == self.params.create_dir {
241            self.file_base.create_dir = params.value.as_i32();
242        } else if Some(reason) == self.params.file_temp_suffix {
243            if let ParamChangeValue::Octet(s) = &params.value {
244                self.file_base.temp_suffix = s.clone();
245            }
246        } else if Some(reason) == self.params.write_file {
247            if params.value.as_i32() != 0 {
248                let result = match self.file_base.mode() {
249                    NDFileMode::Single => {
250                        if let Some(array) = self.latest_array.clone() {
251                            self.write_single(array)
252                        } else {
253                            Err(ADError::UnsupportedConversion(
254                                "no array available for write".into(),
255                            ))
256                        }
257                    }
258                    NDFileMode::Capture => self.file_base.flush_capture(&mut self.writer),
259                    NDFileMode::Stream => {
260                        if let Some(array) = self.latest_array.clone() {
261                            self.file_base.process_array(array, &mut self.writer)
262                        } else {
263                            Err(ADError::UnsupportedConversion(
264                                "no array available for write".into(),
265                            ))
266                        }
267                    }
268                };
269                match result {
270                    Ok(()) => {
271                        updates.extend(self.success_updates());
272                        self.push_num_captured_update(&mut updates);
273                        self.push_full_file_name_update(&mut updates);
274                    }
275                    Err(err) => {
276                        return ParamChangeResult::updates(self.error_updates(
277                            false,
278                            true,
279                            err.to_string(),
280                        ));
281                    }
282                }
283            }
284        } else if Some(reason) == self.params.read_file {
285            if params.value.as_i32() != 0 {
286                let result = (|| -> ADResult<Arc<NDArray>> {
287                    let path = PathBuf::from(self.file_base.create_file_name());
288                    self.writer.open_file(
289                        &path,
290                        NDFileMode::Single,
291                        &NDArray::new(vec![NDDimension::new(1)], NDDataType::UInt8),
292                    )?;
293                    let array = Arc::new(self.writer.read_file()?);
294                    self.writer.close_file()?;
295                    self.latest_array = Some(array.clone());
296                    Ok(array)
297                })();
298                match result {
299                    Ok(array) => {
300                        updates.extend(self.success_updates());
301                        self.push_full_file_name_update(&mut updates);
302                        return ParamChangeResult::combined(vec![array], updates);
303                    }
304                    Err(err) => {
305                        return ParamChangeResult::updates(self.error_updates(
306                            true,
307                            false,
308                            err.to_string(),
309                        ));
310                    }
311                }
312            }
313        } else if Some(reason) == self.params.lazy_open {
314            self.lazy_open = params.value.as_i32() != 0;
315        } else if Some(reason) == self.params.delete_driver_file {
316            self.delete_driver_file = params.value.as_i32() != 0;
317        } else if Some(reason) == self.params.free_capture {
318            if params.value.as_i32() != 0 {
319                self.file_base.clear_capture();
320                self.push_num_captured_update(&mut updates);
321            }
322        } else if Some(reason) == self.params.capture {
323            if params.value.as_i32() != 0 {
324                match self.file_base.mode() {
325                    NDFileMode::Single => {
326                        self.capture_active = false;
327                        return ParamChangeResult::updates(self.error_updates(
328                            false,
329                            false,
330                            "ERROR: capture not supported in Single mode".into(),
331                        ));
332                    }
333                    NDFileMode::Capture => {
334                        self.file_base.clear_capture();
335                        self.file_base.lazy_open = self.lazy_open;
336                        self.file_base.delete_driver_file = self.delete_driver_file;
337                        self.capture_active = true;
338                        self.push_num_captured_update(&mut updates);
339                    }
340                    NDFileMode::Stream => {
341                        self.file_base.lazy_open = self.lazy_open;
342                        self.file_base.delete_driver_file = self.delete_driver_file;
343                        self.capture_active = true;
344                        self.stream_dims = None;
345                        self.stream_data_type = None;
346                        self.push_num_captured_update(&mut updates);
347                    }
348                }
349            } else {
350                if self.file_base.mode() == NDFileMode::Stream {
351                    if let Err(err) = self.file_base.close_stream(&mut self.writer) {
352                        return ParamChangeResult::updates(self.error_updates(
353                            false,
354                            false,
355                            err.to_string(),
356                        ));
357                    }
358                }
359                self.capture_active = false;
360                self.stream_dims = None;
361                self.stream_data_type = None;
362            }
363        }
364
365        ParamChangeResult::updates(updates)
366    }
367
368    // ── helpers ──
369
370    fn write_single(&mut self, array: Arc<NDArray>) -> ADResult<()> {
371        self.file_base.ensure_directory()?;
372        self.file_base.process_array(array, &mut self.writer)
373    }
374
375    fn success_updates(&self) -> Vec<ParamUpdate> {
376        let mut updates = Vec::new();
377        if let Some(idx) = self.params.file_number {
378            updates.push(ParamUpdate::Int32 {
379                reason: idx,
380                addr: 0,
381                value: self.file_base.file_number,
382            });
383        }
384        if let Some(idx) = self.params.write_status {
385            updates.push(ParamUpdate::Int32 {
386                reason: idx,
387                addr: 0,
388                value: 0,
389            });
390        }
391        if let Some(idx) = self.params.write_message {
392            updates.push(ParamUpdate::Octet {
393                reason: idx,
394                addr: 0,
395                value: String::new(),
396            });
397        }
398        if let Some(idx) = self.params.write_file {
399            updates.push(ParamUpdate::Int32 {
400                reason: idx,
401                addr: 0,
402                value: 0,
403            });
404        }
405        if let Some(idx) = self.params.capture {
406            updates.push(ParamUpdate::Int32 {
407                reason: idx,
408                addr: 0,
409                value: if self.capture_active { 1 } else { 0 },
410            });
411        }
412        if let Some(idx) = self.params.read_file {
413            updates.push(ParamUpdate::Int32 {
414                reason: idx,
415                addr: 0,
416                value: 0,
417            });
418        }
419        updates
420    }
421
422    fn push_num_captured_update(&self, updates: &mut Vec<ParamUpdate>) {
423        if let Some(idx) = self.params.num_captured {
424            updates.push(ParamUpdate::Int32 {
425                reason: idx,
426                addr: 0,
427                value: self.file_base.num_captured() as i32,
428            });
429        }
430    }
431
432    fn push_full_file_name_update(&self, updates: &mut Vec<ParamUpdate>) {
433        if let Some(idx) = self.params.full_file_name {
434            updates.push(ParamUpdate::Octet {
435                reason: idx,
436                addr: 0,
437                value: self.file_base.last_written_name().to_string(),
438            });
439        }
440    }
441
442    fn error_updates(
443        &self,
444        read_reason: bool,
445        write_reason: bool,
446        message: String,
447    ) -> Vec<ParamUpdate> {
448        let mut updates = Vec::new();
449        if write_reason {
450            if let Some(idx) = self.params.write_file {
451                updates.push(ParamUpdate::Int32 {
452                    reason: idx,
453                    addr: 0,
454                    value: 0,
455                });
456            }
457        }
458        if read_reason {
459            if let Some(idx) = self.params.read_file {
460                updates.push(ParamUpdate::Int32 {
461                    reason: idx,
462                    addr: 0,
463                    value: 0,
464                });
465            }
466        }
467        if let Some(idx) = self.params.write_status {
468            updates.push(ParamUpdate::Int32 {
469                reason: idx,
470                addr: 0,
471                value: 1,
472            });
473        }
474        if let Some(idx) = self.params.write_message {
475            updates.push(ParamUpdate::Octet {
476                reason: idx,
477                addr: 0,
478                value: message,
479            });
480        }
481        updates
482    }
483}
484
485fn normalize_file_path(path: &str) -> String {
486    if path.is_empty() || path.ends_with(std::path::MAIN_SEPARATOR) {
487        path.to_string()
488    } else {
489        format!("{path}{}", std::path::MAIN_SEPARATOR)
490    }
491}