agent_tk/decision/
default.rs1use async_broadcast::Sender;
4use futures::join;
5
6use crate::prelude::*;
7use definitions::task as definitions_task;
8use delegation::Task as DelTask;
9
10pub struct Options {}
21
22impl Default for Options
23{
24 fn default() -> Self
25 {
26 Self::new()
27 }
28}
29
30impl Options
31{
32 pub fn new() -> Self
34 {
35 Self {}
36 }
37}
38
39module::create_module_private_interface!(
46 ModulePrivateInterface,
47 decision::InputMessage,
48 decision::OutputMessage
49);
50
51struct EndOfQueue
52{
53 total_cost: f32,
54 final_states: crate::states::States,
55 queue_length: usize,
56}
57
58pub struct Module {}
60
61impl Module
62{
63 async fn execute_tst_task(
64 uuid: &uuid::Uuid,
65 tst: &definitions::tst::Node,
66 requester_uri: &String,
67 agent_data: &agent::AgentData,
68 end_of_queue: &ccutils::sync::ArcMutex<EndOfQueue>,
69 output_sender: &Sender<decision::OutputMessage>,
70 _execution_input_sender: &Sender<execution::InputMessage>,
71 ) -> Result<()>
72 {
73 agent_data.knowledge_base.insert(
74 "tasks",
75 uuid.to_hex(),
76 &definitions_task::Task::from_tst(tst.clone()),
77 )?;
78 let queue_length = end_of_queue.lock()?.queue_length;
79 if 10 * queue_length > 8 * consts::TASK_QUEUE_LENGTH
80 {
81 return Ok(());
82 }
83 let states = end_of_queue.lock()?.final_states.clone();
84 let sr = crate::simulation::tst::simulate_execution(
85 states,
86 agent_data.capabilities.clone(),
87 tst.clone(),
88 )
89 .await;
90 let total_cost = end_of_queue.lock()?.total_cost;
91 match sr
92 {
93 Ok(sr) =>
94 {
95 agent_data
96 .knowledge_base
97 .insert("task_simulation_result", uuid.to_hex(), &sr)?;
98 ccutils::log::log_error!(
99 output_sender
100 .broadcast(decision::OutputMessage::CFPProposal {
101 proposal: delegation::Proposal {
102 agent_uri: agent_data.agent_uri.to_owned(),
103 requester_uri: requester_uri.to_owned(),
104 cost: total_cost + sr.get_estimated_cost(),
105 signature: Default::default(),
106 task_uuid: uuid.to_owned(),
107 },
108 })
109 .await,
110 "While sending CFP Proposal"
111 );
112 Ok(())
113 }
114 Err(e) => match e
115 {
116 Error::ExecutionFailed(ef) => match *ef
117 {
118 Error::UnknownCapability(_) => Ok(()),
119 e => Err(e),
120 },
121 e => Err(e),
122 },
123 }
124 }
125 async fn handle_input_message(
126 msg: decision::InputMessage,
127 agent_data: &agent::AgentData,
128 end_of_queue: &ccutils::sync::ArcMutex<EndOfQueue>,
129 output_sender: &Sender<decision::OutputMessage>,
130 execution_input_sender: &Sender<execution::InputMessage>,
131 ) -> Result<()>
132 {
133 match msg
134 {
135 decision::InputMessage::DecideCFPAcceptance { cfp } =>
136 {
137 let task = definitions_task::Task::from_description(&cfp.task_type, &cfp.task_description)?;
138
139 match task.get_container()
140 {
141 definitions_task::TaskContainer::Tst(tst) =>
142 {
143 Self::execute_tst_task(
144 &task.task_id(),
145 tst,
146 &cfp.requester_uri,
147 agent_data,
148 end_of_queue,
149 output_sender,
150 execution_input_sender,
151 )
152 .await?;
153 }
154 definitions_task::TaskContainer::Goal(goal) =>
155 {
156 let tst = conversion::goal::ToTst::convert(goal)?;
157 Self::execute_tst_task(
158 &task.task_id(),
159 &tst,
160 &cfp.requester_uri,
161 agent_data,
162 end_of_queue,
163 output_sender,
164 execution_input_sender,
165 )
166 .await?;
167 }
168 }
169 }
170 decision::InputMessage::QueueExecution {
171 uuid,
172 requester_uri,
173 } =>
174 {
175 let accept = 10 * end_of_queue.lock()?.queue_length < 8 * consts::TASK_QUEUE_LENGTH;
176 if accept
177 {
178 ccutils::log::log_error!(
179 execution_input_sender
180 .broadcast(execution::InputMessage::QueueExecution { uuid })
181 .await,
182 "Sending queue execution"
183 );
184 }
185 ccutils::log::log_error!(
186 output_sender
187 .broadcast(decision::OutputMessage::QueueExecutionResult {
188 uuid,
189 accepted: accept,
190 requester_uri,
191 })
192 .await,
193 "sneding queued execution result"
194 );
195 }
196 decision::InputMessage::CancelExecution { uuid } =>
197 {
198 ccutils::log::log_error!(
199 execution_input_sender
200 .broadcast(execution::InputMessage::CancelExecution {
201 uuid: uuid.to_owned(),
202 })
203 .await,
204 "sending execution cancelled"
205 );
206 }
207 }
208
209 Ok(())
210 }
211}
212
213impl decision::Module for Module
214{
215 type Options = Options;
216 async fn start(
217 agent_data: agent::AgentData,
218 module_interfaces: (decision::ModuleInterface, ModulePrivateInterface),
219 execution_interface: execution::ModuleInterface,
220 _: Options,
221 )
222 {
223 let (_, module_private_interface) = module_interfaces;
224
225 let mut input_receiver = module_private_interface.input_receiver.activate();
226 let output_sender = module_private_interface.output_sender;
227
228 let end_of_queue: ccutils::sync::ArcMutex<_> = EndOfQueue {
229 total_cost: 0.0,
230 final_states: agent_data.states.to_owned_states().unwrap(),
231 queue_length: 0,
232 }
233 .into();
234
235 let execution_input_sender = execution_interface.input_sender();
236
237 let fut_ir = {
238 let end_of_queue = end_of_queue.clone();
239 async move {
240 loop
241 {
242 let msg = input_receiver.recv().await;
243 if let Ok(msg) = msg
244 {
245 if let Err(e) = Self::handle_input_message(
246 msg,
247 &agent_data,
248 &end_of_queue,
249 &output_sender,
250 &execution_input_sender,
251 )
252 .await
253 {
254 log::error!(
255 "An error occured for agent {} when handling decision message: {}",
256 agent_data.agent_uri,
257 e
258 );
259 }
260 }
261 else
262 {
263 log::info!("Decision loop for {:?} is closing", agent_data.agent_uri);
264 return;
265 }
266 }
267 }
268 };
269 let mut execution_output_receiver = execution_interface.output_receiver();
270 let fut_exec_or = async move {
271 loop
272 {
273 let msg = execution_output_receiver.recv().await;
274 if let Ok(msg) = msg
275 {
276 match msg
277 {
278 execution::OutputMessage::CurrentEstimatedIdlingState {
279 current_cost,
280 final_states,
281 queue_length,
282 } =>
283 {
284 let mut end_of_queue = end_of_queue.lock().unwrap();
285 end_of_queue.total_cost = current_cost;
286 end_of_queue.final_states = final_states;
287 end_of_queue.queue_length = queue_length;
288 }
289 }
290 }
291 else
292 {
293 return;
294 }
295 }
296 };
297 join!(fut_ir, fut_exec_or);
298 }
299}
300
301impl module::Module for Module
302{
303 type InputMessage = decision::InputMessage;
304 type OutputMessage = decision::OutputMessage;
305 type ModulePrivateInterface = ModulePrivateInterface;
306}