taple_core/message/message_task_manager/
manager.rs1use crate::{
2 commons::{
3 channel::{ChannelData, MpscChannel},
4 identifier::KeyIdentifier,
5 },
6 Notification,
7};
8use futures::future::{AbortHandle, Abortable, Aborted};
9use log::debug;
10use serde::{de::DeserializeOwned, Serialize};
11use std::collections::HashMap;
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14
15use super::algorithm::Algorithm;
16
17use super::super::{
18 error::Error, message_sender::MessageSender, MessageConfig, MessageTaskCommand,
19 TaskCommandContent,
20};
21
22pub struct MessageTaskManager<T>
23where
24 T: TaskCommandContent + Serialize + DeserializeOwned,
25{
26 list: HashMap<String, (JoinHandle<Result<Result<(), Error>, Aborted>>, AbortHandle)>,
27 receiver: MpscChannel<MessageTaskCommand<T>, ()>,
28 sender: MessageSender,
29 token: CancellationToken,
30 notification_tx: tokio::sync::mpsc::Sender<Notification>,
31}
32
33impl<T: TaskCommandContent + Serialize + DeserializeOwned + 'static> MessageTaskManager<T> {
34 pub fn new(
35 sender: MessageSender,
36 receiver: MpscChannel<MessageTaskCommand<T>, ()>,
37 token: CancellationToken,
38 notification_tx: tokio::sync::mpsc::Sender<Notification>,
39 ) -> MessageTaskManager<T> {
40 MessageTaskManager {
41 list: HashMap::new(),
42 receiver,
43 sender,
44 token,
45 notification_tx,
46 }
47 }
48
49 pub async fn run(mut self) {
50 loop {
51 tokio::select! {
52 msg = self.receiver.receive() => {
53 let result = self.process_input(msg).await;
54 if result.is_err() {
55 log::error!("{}", result.unwrap_err());
56 break;
57 }
58 },
59 _ = self.token.cancelled() => {
60 log::debug!("Message module shutdown received");
61 break;
62 }
63 }
64 }
65 self.token.cancel();
66 log::info!("Ended");
67 }
68
69 async fn process_input(
70 &mut self,
71 input: Option<ChannelData<MessageTaskCommand<T>, ()>>,
72 ) -> Result<(), Error> {
73 match input {
74 Some(data) => match match data {
75 crate::commons::channel::ChannelData::AskData(_) => {
76 panic!("Reciving Ask in MessageTaskManager")
77 }
78 crate::commons::channel::ChannelData::TellData(data) => data.get(),
79 } {
80 MessageTaskCommand::Request(id, message, targets, config) => match id {
81 Some(id) => {
82 self.create_indefinite_message_task(id, message, targets, config)
83 .await?;
84 }
85 None => self.create_message_task(message, targets, config)?,
86 },
87 MessageTaskCommand::Cancel(id) => {
88 self.cancel_task(&id).await?;
89 }
90 },
91 None => {
92 return Err(Error::SenderChannelError);
93 }
94 }
95 Ok(())
96 }
97
98 async fn create_indefinite_message_task(
99 &mut self,
100 id: String,
101 content: T,
102 targets: Vec<KeyIdentifier>,
103 config: MessageConfig,
104 ) -> Result<(), Error> {
105 if let Some(_entry) = self.list.get(&id) {
106 self.cancel_task(&id).await?;
107 }
108 let (abort_handle, abort_registration) = AbortHandle::new_pair();
109 self.list.insert(
110 id,
111 (
112 tokio::spawn(Abortable::new(
113 Algorithm::make_indefinite_future(
114 self.sender.clone(),
115 config,
116 content,
117 targets,
118 ),
119 abort_registration,
120 )),
121 abort_handle,
122 ),
123 );
124 Ok(())
125 }
126
127 async fn cancel_task(&mut self, id: &String) -> Result<(), Error> {
128 let Some((tokio_handler, abort_handler)) = self.list.remove(id) else {
129 return Ok(())
130 };
131 abort_handler.abort();
132 match tokio_handler.await {
133 Err(error) => return Err(Error::TaskError { source: error }),
134 Ok(inner_state) => match inner_state {
135 Ok(task_result) => {
136 if let Err(e) = task_result {
137 debug!("Indefinite task did finish with error {:?}", e);
138 }
139 }
140 Err(_) => {
141 debug!("Task {} properly cancelled", id);
142 }
143 },
144 };
145 Ok(())
146 }
147
148 fn create_message_task(
149 &mut self,
150 content: T,
151 targets: Vec<KeyIdentifier>,
152 config: MessageConfig,
153 ) -> Result<(), Error> {
154 tokio::spawn(Algorithm::make_future(
155 content,
156 targets,
157 self.sender.clone(),
158 config,
159 ));
160 Ok(())
161 }
162}