dnp3/master/tasks/
mod.rs

1use crate::app::format::write::HeaderWriter;
2use crate::app::parse::parser::{HeaderCollection, Response};
3use crate::app::FunctionCode;
4use crate::app::ResponseHeader;
5use crate::master::association::Association;
6use crate::master::error::TaskError;
7use crate::master::extract::extract_measurements;
8use crate::master::poll::Poll;
9use crate::master::promise::Promise;
10use crate::master::request::{Classes, EventClasses};
11use crate::master::tasks::auto::AutoTask;
12use crate::master::tasks::command::CommandTask;
13use crate::master::tasks::read::SingleReadTask;
14use crate::master::tasks::restart::RestartTask;
15use crate::master::tasks::time::TimeSyncTask;
16use crate::master::{ReadType, TaskType};
17
18use crate::master::tasks::deadbands::WriteDeadBandsTask;
19use crate::master::tasks::empty_response::EmptyResponseTask;
20use crate::master::tasks::file::authenticate::AuthFileTask;
21use crate::master::tasks::file::close::CloseFileTask;
22use crate::master::tasks::file::get_info::GetFileInfoTask;
23use crate::master::tasks::file::open::OpenFileTask;
24use crate::master::tasks::file::read::FileReadTask;
25use crate::master::tasks::file::write_block::WriteBlockTask;
26use crate::transport::FragmentAddr;
27
28pub(crate) mod auto;
29pub(crate) mod command;
30pub(crate) mod deadbands;
31pub(crate) mod empty_response;
32
33pub(crate) mod file;
34pub(crate) mod read;
35pub(crate) mod restart;
36pub(crate) mod time;
37
38/// Queued task requiring I/O
39pub(crate) struct AssociationTask {
40    /// Destination addresses for tasks
41    pub(crate) dest: FragmentAddr,
42    /// Actual task to perform
43    pub(crate) details: Task,
44}
45
46impl AssociationTask {
47    pub(crate) fn new(dest: FragmentAddr, details: Task) -> Self {
48        Self { dest, details }
49    }
50}
51
52/// There are two broad categories of tasks. Reads
53/// require handling for multi-fragmented responses.
54pub(crate) enum AppTask {
55    /// Reads require handling for multi-fragmented responses
56    Read(ReadTask),
57    /// NonRead tasks always require FIR/FIN == 1, but might require multiple read/response cycles, e.g. SBO
58    NonRead(NonReadTask),
59}
60
61pub(crate) enum Task {
62    /// An application layer task
63    App(AppTask),
64    /// Send link status request
65    LinkStatus(Promise<Result<(), TaskError>>),
66}
67
68#[derive(Copy, Clone, PartialEq, Debug)]
69pub(crate) enum TaskId {
70    LinkStatus,
71    Function(FunctionCode),
72}
73
74impl AppTask {
75    pub(crate) fn as_task_type(&self) -> TaskType {
76        match self {
77            AppTask::Read(t) => t.as_task_type(),
78            AppTask::NonRead(t) => t.as_task_type(),
79        }
80    }
81
82    pub(crate) fn function(&self) -> FunctionCode {
83        match self {
84            AppTask::Read(t) => t.function(),
85            AppTask::NonRead(t) => t.function(),
86        }
87    }
88
89    pub(crate) fn on_task_error(self, association: Option<&mut Association>, err: TaskError) {
90        match self {
91            Self::NonRead(task) => task.on_task_error(association, err),
92            Self::Read(task) => task.on_task_error(association, err),
93        }
94    }
95
96    pub(crate) fn get_id(&self) -> TaskId {
97        match self {
98            AppTask::Read(_) => TaskId::Function(FunctionCode::Read),
99            AppTask::NonRead(t) => TaskId::Function(t.function()),
100        }
101    }
102}
103
104impl Task {
105    pub(crate) fn on_task_error(self, association: Option<&mut Association>, err: TaskError) {
106        match self {
107            Self::App(task) => task.on_task_error(association, err),
108            Self::LinkStatus(promise) => promise.complete(Err(err)),
109        }
110    }
111
112    /// Perform operation before sending and check if the request should still be sent
113    ///
114    /// Returning Some means the task should proceed, returning None means
115    /// the task was cancelled, forget about it.
116    pub(crate) fn start(self, association: &mut Association) -> Option<Task> {
117        if let Task::App(AppTask::NonRead(task)) = self {
118            return task.start(association).map(|task| task.wrap());
119        }
120
121        Some(self)
122    }
123
124    pub(crate) fn get_id(&self) -> TaskId {
125        match self {
126            Task::LinkStatus(_) => TaskId::LinkStatus,
127            Task::App(task) => task.get_id(),
128        }
129    }
130}
131
132pub(crate) trait RequestWriter {
133    fn function(&self) -> FunctionCode;
134    fn write(&self, writer: &mut HeaderWriter) -> Result<(), TaskError>;
135}
136
137pub(crate) enum ReadTask {
138    /// Periodic polls that are configured when creating associations
139    PeriodicPoll(Poll),
140    /// Integrity poll that occurs during startup, or after outstation restarts
141    StartupIntegrity(Classes),
142    /// Event scan when IIN bit is detected
143    EventScan(EventClasses),
144    /// One-time read request
145    SingleRead(SingleReadTask),
146}
147
148pub(crate) enum NonReadTask {
149    /// tasks that occur automatically during startup, or based on events or configuration,
150    Auto(AutoTask),
151    /// commands initiated from the user API
152    Command(CommandTask),
153    /// time synchronization
154    TimeSync(TimeSyncTask),
155    /// restart operation
156    Restart(RestartTask),
157    /// write dead-bands
158    DeadBands(WriteDeadBandsTask),
159    /// Generic task for anything that doesn't have response object headers
160    EmptyResponseTask(EmptyResponseTask),
161    /// Read file from the outstation
162    FileRead(FileReadTask),
163    /// Send username/password and get back an auth key
164    AuthFile(AuthFileTask),
165    /// Open a file on the outstation
166    OpenFile(OpenFileTask),
167    /// Close a file on the outstation
168    CloseFile(CloseFileTask),
169    /// Write a file block
170    WriteFileBlock(WriteBlockTask),
171    /// get info about a file
172    GetFileInfo(GetFileInfoTask),
173}
174
175impl RequestWriter for ReadTask {
176    fn function(&self) -> FunctionCode {
177        FunctionCode::Read
178    }
179
180    fn write(&self, writer: &mut HeaderWriter) -> Result<(), TaskError> {
181        match self {
182            ReadTask::PeriodicPoll(poll) => poll.format(writer)?,
183            ReadTask::StartupIntegrity(classes) => classes.write(writer)?,
184            ReadTask::EventScan(classes) => classes.write(writer)?,
185            ReadTask::SingleRead(req) => req.format(writer)?,
186        }
187        Ok(())
188    }
189}
190
191impl RequestWriter for NonReadTask {
192    fn function(&self) -> FunctionCode {
193        self.function()
194    }
195
196    fn write(&self, writer: &mut HeaderWriter) -> Result<(), TaskError> {
197        match self {
198            NonReadTask::Auto(t) => t.write(writer)?,
199            NonReadTask::Command(t) => t.write(writer)?,
200            NonReadTask::TimeSync(t) => t.write(writer)?,
201            NonReadTask::Restart(_) => {}
202            NonReadTask::DeadBands(t) => t.write(writer)?,
203            NonReadTask::EmptyResponseTask(t) => t.write(writer)?,
204            NonReadTask::FileRead(t) => t.write(writer)?,
205            NonReadTask::GetFileInfo(t) => t.write(writer)?,
206            NonReadTask::OpenFile(t) => t.write(writer)?,
207            NonReadTask::CloseFile(t) => t.write(writer)?,
208            NonReadTask::WriteFileBlock(t) => t.write(writer)?,
209            NonReadTask::AuthFile(t) => t.write(writer)?,
210        }
211        Ok(())
212    }
213}
214
215impl From<crate::app::format::WriteError> for TaskError {
216    fn from(_: crate::app::format::WriteError) -> Self {
217        TaskError::WriteError
218    }
219}
220
221impl ReadTask {
222    pub(crate) fn wrap(self) -> Task {
223        Task::App(AppTask::Read(self))
224    }
225
226    pub(crate) async fn process_response(
227        &mut self,
228        association: &mut Association,
229        header: ResponseHeader,
230        objects: HeaderCollection<'_>,
231    ) {
232        match self {
233            ReadTask::StartupIntegrity(_) => {
234                association.handle_integrity_response(header, objects).await
235            }
236            ReadTask::PeriodicPoll(_) => association.handle_poll_response(header, objects).await,
237            ReadTask::EventScan(_) => {
238                association
239                    .handle_event_scan_response(header, objects)
240                    .await
241            }
242            ReadTask::SingleRead(task) => match &mut task.custom_handler {
243                Some(handler) => {
244                    extract_measurements(ReadType::SinglePoll, header, objects, handler.as_mut())
245                        .await
246                }
247                None => association.handle_read_response(header, objects).await,
248            },
249        }
250    }
251
252    pub(crate) fn complete(self, association: &mut Association) {
253        match self {
254            ReadTask::StartupIntegrity(_) => association.on_integrity_scan_complete(),
255            ReadTask::PeriodicPoll(poll) => association.complete_poll(poll.id),
256            ReadTask::EventScan(_) => association.on_event_scan_complete(),
257            ReadTask::SingleRead(task) => task.on_complete(),
258        }
259    }
260
261    pub(crate) fn on_task_error(self, association: Option<&mut Association>, err: TaskError) {
262        match self {
263            ReadTask::StartupIntegrity(_) => {
264                if let Some(association) = association {
265                    association.on_integrity_scan_failure();
266                }
267            }
268            ReadTask::PeriodicPoll(poll) => {
269                if let Some(association) = association {
270                    tracing::warn!("poll {} failed", poll.id);
271                    association.complete_poll(poll.id);
272                }
273            }
274            ReadTask::EventScan(_) => {
275                if let Some(association) = association {
276                    association.on_event_scan_failure();
277                }
278            }
279            ReadTask::SingleRead(task) => task.on_task_error(err),
280        }
281    }
282
283    pub(crate) fn as_task_type(&self) -> TaskType {
284        match self {
285            Self::PeriodicPoll(_) => TaskType::PeriodicPoll,
286            Self::StartupIntegrity(_) => TaskType::StartupIntegrity,
287            Self::EventScan(_) => TaskType::AutoEventScan,
288            Self::SingleRead(_) => TaskType::UserRead,
289        }
290    }
291}
292
293impl NonReadTask {
294    pub(crate) fn wrap(self) -> Task {
295        Task::App(AppTask::NonRead(self))
296    }
297
298    pub(crate) fn start(self, association: &mut Association) -> Option<NonReadTask> {
299        match self {
300            Self::Command(_) => Some(self),
301            Self::Auto(_) => Some(self),
302            Self::TimeSync(task) => task.start(association).map(|task| task.wrap()),
303            Self::Restart(_) => Some(self),
304            Self::DeadBands(_) => Some(self),
305            Self::EmptyResponseTask(_) => Some(self),
306            Self::FileRead(_) => Some(self),
307            Self::GetFileInfo(_) => Some(self),
308            Self::OpenFile(_) => Some(self),
309            Self::CloseFile(_) => Some(self),
310            Self::WriteFileBlock(_) => Some(self),
311            Self::AuthFile(_) => Some(self),
312        }
313    }
314
315    pub(crate) fn function(&self) -> FunctionCode {
316        match self {
317            Self::Command(task) => task.function(),
318            Self::Auto(task) => task.function(),
319            Self::TimeSync(task) => task.function(),
320            Self::Restart(task) => task.function(),
321            Self::DeadBands(task) => task.function(),
322            Self::EmptyResponseTask(task) => task.function(),
323            Self::FileRead(task) => task.function(),
324            Self::GetFileInfo(task) => task.function(),
325            Self::OpenFile(task) => task.function(),
326            Self::CloseFile(task) => task.function(),
327            Self::WriteFileBlock(task) => task.function(),
328            Self::AuthFile(task) => task.function(),
329        }
330    }
331
332    pub(crate) fn on_task_error(self, association: Option<&mut Association>, err: TaskError) {
333        match self {
334            Self::Command(task) => task.on_task_error(err),
335            Self::TimeSync(task) => task.on_task_error(association, err),
336            Self::Auto(task) => task.on_task_error(association, err),
337            Self::Restart(task) => task.on_task_error(err),
338            Self::DeadBands(task) => task.on_task_error(err),
339            Self::EmptyResponseTask(task) => task.on_task_error(err),
340            Self::FileRead(task) => task.on_task_error(err),
341            Self::GetFileInfo(task) => task.on_task_error(err),
342            Self::OpenFile(task) => task.on_task_error(err),
343            Self::CloseFile(task) => task.on_task_error(err),
344            Self::WriteFileBlock(task) => task.on_task_error(err),
345            Self::AuthFile(task) => task.on_task_error(err),
346        }
347    }
348
349    pub(crate) async fn handle_response(
350        self,
351        association: &mut Association,
352        response: Response<'_>,
353    ) -> Result<Option<NonReadTask>, TaskError> {
354        match self {
355            Self::Command(task) => task.handle(response),
356            Self::Auto(task) => task.handle(association, response),
357            Self::TimeSync(task) => task.handle(association, response),
358            Self::Restart(task) => task.handle(response),
359            Self::DeadBands(task) => task.handle(response),
360            Self::EmptyResponseTask(task) => task.handle(response),
361            Self::FileRead(task) => task.handle(response).await,
362            Self::GetFileInfo(task) => task.handle(response),
363            Self::OpenFile(task) => task.handle(response),
364            Self::CloseFile(task) => task.handle(response),
365            Self::WriteFileBlock(task) => task.handle(response),
366            Self::AuthFile(task) => task.handle(response),
367        }
368    }
369
370    pub(crate) fn as_task_type(&self) -> TaskType {
371        match self {
372            Self::Command(_) => TaskType::Command,
373            Self::Auto(x) => match x {
374                AutoTask::ClearRestartBit => TaskType::ClearRestartBit,
375                AutoTask::EnableUnsolicited(_) => TaskType::EnableUnsolicited,
376                AutoTask::DisableUnsolicited(_) => TaskType::DisableUnsolicited,
377            },
378            Self::TimeSync(_) => TaskType::TimeSync,
379            Self::Restart(_) => TaskType::Restart,
380            Self::DeadBands(_) => TaskType::WriteDeadBands,
381            Self::EmptyResponseTask(_) => TaskType::GenericEmptyResponse(self.function()),
382            Self::FileRead(_) => TaskType::FileRead,
383            Self::GetFileInfo(_) => TaskType::GetFileInfo,
384            Self::AuthFile(_) => TaskType::GetFileInfo,
385            Self::OpenFile(_) => TaskType::FileOpen,
386            Self::CloseFile(_) => TaskType::FileClose,
387            Self::WriteFileBlock(_) => TaskType::FileWriteBlock,
388        }
389    }
390}