1use crate::app::format::write::HeaderWriter;
2use crate::app::parse::parser::{HeaderCollection, Response};
3use crate::app::FunctionCode;
4use crate::app::ResponseHeader;
5use crate::master::association::Association;
6use crate::master::error::TaskError;
7use crate::master::extract::extract_measurements;
8use crate::master::poll::Poll;
9use crate::master::promise::Promise;
10use crate::master::request::{Classes, EventClasses};
11use crate::master::tasks::auto::AutoTask;
12use crate::master::tasks::command::CommandTask;
13use crate::master::tasks::read::SingleReadTask;
14use crate::master::tasks::restart::RestartTask;
15use crate::master::tasks::time::TimeSyncTask;
16use crate::master::{ReadType, TaskType};
17
18use crate::master::tasks::deadbands::WriteDeadBandsTask;
19use crate::master::tasks::empty_response::EmptyResponseTask;
20use crate::master::tasks::file::authenticate::AuthFileTask;
21use crate::master::tasks::file::close::CloseFileTask;
22use crate::master::tasks::file::get_info::GetFileInfoTask;
23use crate::master::tasks::file::open::OpenFileTask;
24use crate::master::tasks::file::read::FileReadTask;
25use crate::master::tasks::file::write_block::WriteBlockTask;
26use crate::transport::FragmentAddr;
27
28pub(crate) mod auto;
29pub(crate) mod command;
30pub(crate) mod deadbands;
31pub(crate) mod empty_response;
32
33pub(crate) mod file;
34pub(crate) mod read;
35pub(crate) mod restart;
36pub(crate) mod time;
37
38pub(crate) struct AssociationTask {
40 pub(crate) dest: FragmentAddr,
42 pub(crate) details: Task,
44}
45
46impl AssociationTask {
47 pub(crate) fn new(dest: FragmentAddr, details: Task) -> Self {
48 Self { dest, details }
49 }
50}
51
52pub(crate) enum AppTask {
55 Read(ReadTask),
57 NonRead(NonReadTask),
59}
60
61pub(crate) enum Task {
62 App(AppTask),
64 LinkStatus(Promise<Result<(), TaskError>>),
66}
67
68#[derive(Copy, Clone, PartialEq, Debug)]
69pub(crate) enum TaskId {
70 LinkStatus,
71 Function(FunctionCode),
72}
73
74impl AppTask {
75 pub(crate) fn as_task_type(&self) -> TaskType {
76 match self {
77 AppTask::Read(t) => t.as_task_type(),
78 AppTask::NonRead(t) => t.as_task_type(),
79 }
80 }
81
82 pub(crate) fn function(&self) -> FunctionCode {
83 match self {
84 AppTask::Read(t) => t.function(),
85 AppTask::NonRead(t) => t.function(),
86 }
87 }
88
89 pub(crate) fn on_task_error(self, association: Option<&mut Association>, err: TaskError) {
90 match self {
91 Self::NonRead(task) => task.on_task_error(association, err),
92 Self::Read(task) => task.on_task_error(association, err),
93 }
94 }
95
96 pub(crate) fn get_id(&self) -> TaskId {
97 match self {
98 AppTask::Read(_) => TaskId::Function(FunctionCode::Read),
99 AppTask::NonRead(t) => TaskId::Function(t.function()),
100 }
101 }
102}
103
104impl Task {
105 pub(crate) fn on_task_error(self, association: Option<&mut Association>, err: TaskError) {
106 match self {
107 Self::App(task) => task.on_task_error(association, err),
108 Self::LinkStatus(promise) => promise.complete(Err(err)),
109 }
110 }
111
112 pub(crate) fn start(self, association: &mut Association) -> Option<Task> {
117 if let Task::App(AppTask::NonRead(task)) = self {
118 return task.start(association).map(|task| task.wrap());
119 }
120
121 Some(self)
122 }
123
124 pub(crate) fn get_id(&self) -> TaskId {
125 match self {
126 Task::LinkStatus(_) => TaskId::LinkStatus,
127 Task::App(task) => task.get_id(),
128 }
129 }
130}
131
132pub(crate) trait RequestWriter {
133 fn function(&self) -> FunctionCode;
134 fn write(&self, writer: &mut HeaderWriter) -> Result<(), TaskError>;
135}
136
137pub(crate) enum ReadTask {
138 PeriodicPoll(Poll),
140 StartupIntegrity(Classes),
142 EventScan(EventClasses),
144 SingleRead(SingleReadTask),
146}
147
148pub(crate) enum NonReadTask {
149 Auto(AutoTask),
151 Command(CommandTask),
153 TimeSync(TimeSyncTask),
155 Restart(RestartTask),
157 DeadBands(WriteDeadBandsTask),
159 EmptyResponseTask(EmptyResponseTask),
161 FileRead(FileReadTask),
163 AuthFile(AuthFileTask),
165 OpenFile(OpenFileTask),
167 CloseFile(CloseFileTask),
169 WriteFileBlock(WriteBlockTask),
171 GetFileInfo(GetFileInfoTask),
173}
174
175impl RequestWriter for ReadTask {
176 fn function(&self) -> FunctionCode {
177 FunctionCode::Read
178 }
179
180 fn write(&self, writer: &mut HeaderWriter) -> Result<(), TaskError> {
181 match self {
182 ReadTask::PeriodicPoll(poll) => poll.format(writer)?,
183 ReadTask::StartupIntegrity(classes) => classes.write(writer)?,
184 ReadTask::EventScan(classes) => classes.write(writer)?,
185 ReadTask::SingleRead(req) => req.format(writer)?,
186 }
187 Ok(())
188 }
189}
190
191impl RequestWriter for NonReadTask {
192 fn function(&self) -> FunctionCode {
193 self.function()
194 }
195
196 fn write(&self, writer: &mut HeaderWriter) -> Result<(), TaskError> {
197 match self {
198 NonReadTask::Auto(t) => t.write(writer)?,
199 NonReadTask::Command(t) => t.write(writer)?,
200 NonReadTask::TimeSync(t) => t.write(writer)?,
201 NonReadTask::Restart(_) => {}
202 NonReadTask::DeadBands(t) => t.write(writer)?,
203 NonReadTask::EmptyResponseTask(t) => t.write(writer)?,
204 NonReadTask::FileRead(t) => t.write(writer)?,
205 NonReadTask::GetFileInfo(t) => t.write(writer)?,
206 NonReadTask::OpenFile(t) => t.write(writer)?,
207 NonReadTask::CloseFile(t) => t.write(writer)?,
208 NonReadTask::WriteFileBlock(t) => t.write(writer)?,
209 NonReadTask::AuthFile(t) => t.write(writer)?,
210 }
211 Ok(())
212 }
213}
214
215impl From<crate::app::format::WriteError> for TaskError {
216 fn from(_: crate::app::format::WriteError) -> Self {
217 TaskError::WriteError
218 }
219}
220
221impl ReadTask {
222 pub(crate) fn wrap(self) -> Task {
223 Task::App(AppTask::Read(self))
224 }
225
226 pub(crate) async fn process_response(
227 &mut self,
228 association: &mut Association,
229 header: ResponseHeader,
230 objects: HeaderCollection<'_>,
231 ) {
232 match self {
233 ReadTask::StartupIntegrity(_) => {
234 association.handle_integrity_response(header, objects).await
235 }
236 ReadTask::PeriodicPoll(_) => association.handle_poll_response(header, objects).await,
237 ReadTask::EventScan(_) => {
238 association
239 .handle_event_scan_response(header, objects)
240 .await
241 }
242 ReadTask::SingleRead(task) => match &mut task.custom_handler {
243 Some(handler) => {
244 extract_measurements(ReadType::SinglePoll, header, objects, handler.as_mut())
245 .await
246 }
247 None => association.handle_read_response(header, objects).await,
248 },
249 }
250 }
251
252 pub(crate) fn complete(self, association: &mut Association) {
253 match self {
254 ReadTask::StartupIntegrity(_) => association.on_integrity_scan_complete(),
255 ReadTask::PeriodicPoll(poll) => association.complete_poll(poll.id),
256 ReadTask::EventScan(_) => association.on_event_scan_complete(),
257 ReadTask::SingleRead(task) => task.on_complete(),
258 }
259 }
260
261 pub(crate) fn on_task_error(self, association: Option<&mut Association>, err: TaskError) {
262 match self {
263 ReadTask::StartupIntegrity(_) => {
264 if let Some(association) = association {
265 association.on_integrity_scan_failure();
266 }
267 }
268 ReadTask::PeriodicPoll(poll) => {
269 if let Some(association) = association {
270 tracing::warn!("poll {} failed", poll.id);
271 association.complete_poll(poll.id);
272 }
273 }
274 ReadTask::EventScan(_) => {
275 if let Some(association) = association {
276 association.on_event_scan_failure();
277 }
278 }
279 ReadTask::SingleRead(task) => task.on_task_error(err),
280 }
281 }
282
283 pub(crate) fn as_task_type(&self) -> TaskType {
284 match self {
285 Self::PeriodicPoll(_) => TaskType::PeriodicPoll,
286 Self::StartupIntegrity(_) => TaskType::StartupIntegrity,
287 Self::EventScan(_) => TaskType::AutoEventScan,
288 Self::SingleRead(_) => TaskType::UserRead,
289 }
290 }
291}
292
293impl NonReadTask {
294 pub(crate) fn wrap(self) -> Task {
295 Task::App(AppTask::NonRead(self))
296 }
297
298 pub(crate) fn start(self, association: &mut Association) -> Option<NonReadTask> {
299 match self {
300 Self::Command(_) => Some(self),
301 Self::Auto(_) => Some(self),
302 Self::TimeSync(task) => task.start(association).map(|task| task.wrap()),
303 Self::Restart(_) => Some(self),
304 Self::DeadBands(_) => Some(self),
305 Self::EmptyResponseTask(_) => Some(self),
306 Self::FileRead(_) => Some(self),
307 Self::GetFileInfo(_) => Some(self),
308 Self::OpenFile(_) => Some(self),
309 Self::CloseFile(_) => Some(self),
310 Self::WriteFileBlock(_) => Some(self),
311 Self::AuthFile(_) => Some(self),
312 }
313 }
314
315 pub(crate) fn function(&self) -> FunctionCode {
316 match self {
317 Self::Command(task) => task.function(),
318 Self::Auto(task) => task.function(),
319 Self::TimeSync(task) => task.function(),
320 Self::Restart(task) => task.function(),
321 Self::DeadBands(task) => task.function(),
322 Self::EmptyResponseTask(task) => task.function(),
323 Self::FileRead(task) => task.function(),
324 Self::GetFileInfo(task) => task.function(),
325 Self::OpenFile(task) => task.function(),
326 Self::CloseFile(task) => task.function(),
327 Self::WriteFileBlock(task) => task.function(),
328 Self::AuthFile(task) => task.function(),
329 }
330 }
331
332 pub(crate) fn on_task_error(self, association: Option<&mut Association>, err: TaskError) {
333 match self {
334 Self::Command(task) => task.on_task_error(err),
335 Self::TimeSync(task) => task.on_task_error(association, err),
336 Self::Auto(task) => task.on_task_error(association, err),
337 Self::Restart(task) => task.on_task_error(err),
338 Self::DeadBands(task) => task.on_task_error(err),
339 Self::EmptyResponseTask(task) => task.on_task_error(err),
340 Self::FileRead(task) => task.on_task_error(err),
341 Self::GetFileInfo(task) => task.on_task_error(err),
342 Self::OpenFile(task) => task.on_task_error(err),
343 Self::CloseFile(task) => task.on_task_error(err),
344 Self::WriteFileBlock(task) => task.on_task_error(err),
345 Self::AuthFile(task) => task.on_task_error(err),
346 }
347 }
348
349 pub(crate) async fn handle_response(
350 self,
351 association: &mut Association,
352 response: Response<'_>,
353 ) -> Result<Option<NonReadTask>, TaskError> {
354 match self {
355 Self::Command(task) => task.handle(response),
356 Self::Auto(task) => task.handle(association, response),
357 Self::TimeSync(task) => task.handle(association, response),
358 Self::Restart(task) => task.handle(response),
359 Self::DeadBands(task) => task.handle(response),
360 Self::EmptyResponseTask(task) => task.handle(response),
361 Self::FileRead(task) => task.handle(response).await,
362 Self::GetFileInfo(task) => task.handle(response),
363 Self::OpenFile(task) => task.handle(response),
364 Self::CloseFile(task) => task.handle(response),
365 Self::WriteFileBlock(task) => task.handle(response),
366 Self::AuthFile(task) => task.handle(response),
367 }
368 }
369
370 pub(crate) fn as_task_type(&self) -> TaskType {
371 match self {
372 Self::Command(_) => TaskType::Command,
373 Self::Auto(x) => match x {
374 AutoTask::ClearRestartBit => TaskType::ClearRestartBit,
375 AutoTask::EnableUnsolicited(_) => TaskType::EnableUnsolicited,
376 AutoTask::DisableUnsolicited(_) => TaskType::DisableUnsolicited,
377 },
378 Self::TimeSync(_) => TaskType::TimeSync,
379 Self::Restart(_) => TaskType::Restart,
380 Self::DeadBands(_) => TaskType::WriteDeadBands,
381 Self::EmptyResponseTask(_) => TaskType::GenericEmptyResponse(self.function()),
382 Self::FileRead(_) => TaskType::FileRead,
383 Self::GetFileInfo(_) => TaskType::GetFileInfo,
384 Self::AuthFile(_) => TaskType::GetFileInfo,
385 Self::OpenFile(_) => TaskType::FileOpen,
386 Self::CloseFile(_) => TaskType::FileClose,
387 Self::WriteFileBlock(_) => TaskType::FileWriteBlock,
388 }
389 }
390}