taple_core/message/message_task_manager/
manager.rs

1use crate::{
2    commons::{
3        channel::{ChannelData, MpscChannel},
4        identifier::KeyIdentifier,
5    },
6    Notification,
7};
8use futures::future::{AbortHandle, Abortable, Aborted};
9use log::debug;
10use serde::{de::DeserializeOwned, Serialize};
11use std::collections::HashMap;
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14
15use super::algorithm::Algorithm;
16
17use super::super::{
18    error::Error, message_sender::MessageSender, MessageConfig, MessageTaskCommand,
19    TaskCommandContent,
20};
21
22pub struct MessageTaskManager<T>
23where
24    T: TaskCommandContent + Serialize + DeserializeOwned,
25{
26    list: HashMap<String, (JoinHandle<Result<Result<(), Error>, Aborted>>, AbortHandle)>,
27    receiver: MpscChannel<MessageTaskCommand<T>, ()>,
28    sender: MessageSender,
29    token: CancellationToken,
30    notification_tx: tokio::sync::mpsc::Sender<Notification>,
31}
32
33impl<T: TaskCommandContent + Serialize + DeserializeOwned + 'static> MessageTaskManager<T> {
34    pub fn new(
35        sender: MessageSender,
36        receiver: MpscChannel<MessageTaskCommand<T>, ()>,
37        token: CancellationToken,
38        notification_tx: tokio::sync::mpsc::Sender<Notification>,
39    ) -> MessageTaskManager<T> {
40        MessageTaskManager {
41            list: HashMap::new(),
42            receiver,
43            sender,
44            token,
45            notification_tx,
46        }
47    }
48
49    pub async fn run(mut self) {
50        loop {
51            tokio::select! {
52                msg = self.receiver.receive() => {
53                    let result = self.process_input(msg).await;
54                    if result.is_err() {
55                        log::error!("{}", result.unwrap_err());
56                        break;
57                    }
58                },
59                _ = self.token.cancelled() => {
60                    log::debug!("Message module shutdown received");
61                    break;
62                }
63            }
64        }
65        self.token.cancel();
66        log::info!("Ended");
67    }
68
69    async fn process_input(
70        &mut self,
71        input: Option<ChannelData<MessageTaskCommand<T>, ()>>,
72    ) -> Result<(), Error> {
73        match input {
74            Some(data) => match match data {
75                crate::commons::channel::ChannelData::AskData(_) => {
76                    panic!("Reciving Ask in MessageTaskManager")
77                }
78                crate::commons::channel::ChannelData::TellData(data) => data.get(),
79            } {
80                MessageTaskCommand::Request(id, message, targets, config) => match id {
81                    Some(id) => {
82                        self.create_indefinite_message_task(id, message, targets, config)
83                            .await?;
84                    }
85                    None => self.create_message_task(message, targets, config)?,
86                },
87                MessageTaskCommand::Cancel(id) => {
88                    self.cancel_task(&id).await?;
89                }
90            },
91            None => {
92                return Err(Error::SenderChannelError);
93            }
94        }
95        Ok(())
96    }
97
98    async fn create_indefinite_message_task(
99        &mut self,
100        id: String,
101        content: T,
102        targets: Vec<KeyIdentifier>,
103        config: MessageConfig,
104    ) -> Result<(), Error> {
105        if let Some(_entry) = self.list.get(&id) {
106            self.cancel_task(&id).await?;
107        }
108        let (abort_handle, abort_registration) = AbortHandle::new_pair();
109        self.list.insert(
110            id,
111            (
112                tokio::spawn(Abortable::new(
113                    Algorithm::make_indefinite_future(
114                        self.sender.clone(),
115                        config,
116                        content,
117                        targets,
118                    ),
119                    abort_registration,
120                )),
121                abort_handle,
122            ),
123        );
124        Ok(())
125    }
126
127    async fn cancel_task(&mut self, id: &String) -> Result<(), Error> {
128        let Some((tokio_handler, abort_handler)) = self.list.remove(id) else {
129            return Ok(())
130        };
131        abort_handler.abort();
132        match tokio_handler.await {
133            Err(error) => return Err(Error::TaskError { source: error }),
134            Ok(inner_state) => match inner_state {
135                Ok(task_result) => {
136                    if let Err(e) = task_result {
137                        debug!("Indefinite task did finish with error {:?}", e);
138                    }
139                }
140                Err(_) => {
141                    debug!("Task {} properly cancelled", id);
142                }
143            },
144        };
145        Ok(())
146    }
147
148    fn create_message_task(
149        &mut self,
150        content: T,
151        targets: Vec<KeyIdentifier>,
152        config: MessageConfig,
153    ) -> Result<(), Error> {
154        tokio::spawn(Algorithm::make_future(
155            content,
156            targets,
157            self.sender.clone(),
158            config,
159        ));
160        Ok(())
161    }
162}