1use ::std::borrow::Borrow;
4use std::time::{Duration, Instant};
5
6#[cfg(feature = "mqtt")]
7pub mod mqtt;
8
9pub mod transport;
10pub mod transport_messages;
11
12use futures::join;
13use yaaral::TaskInterface as _;
14use yaaral::time::TimeInterface as _;
15
16use crate::prelude::*;
17
18#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
26pub struct CFP
27{
28 pub uuid: uuid::Uuid,
30 pub requester_uri: String,
32 pub team_uri: Option<String>,
34 pub task_type: String,
36 pub task_description: String,
38 pub signature: Vec<u8>,
40}
41
42#[derive(Clone, serde::Deserialize, serde::Serialize)]
51pub struct Proposal
52{
53 pub agent_uri: String,
55 pub requester_uri: String,
57 pub cost: f32,
59 pub task_uuid: uuid::Uuid,
61 pub signature: Vec<u8>,
63}
64
65#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
73pub enum Status
74{
75 SendCFP,
77 ReceivedProposals(usize),
79 ProposalAccepted,
81 NoReceivedProposals,
83 NoAcceptedProposal,
85 ExecutionStarted,
87 NodeExecutionStarted
89 {
90 node: uuid::Uuid,
92 },
93 NodeExecutionCompleted
95 {
96 node: uuid::Uuid,
98 },
99}
100
101pub trait Task: Sized
109{
110 fn from_description(task_type: impl AsRef<str>, description: impl AsRef<str>) -> Result<Self>;
112 fn task_type(&self) -> &str;
114 fn to_description(&self) -> String;
116 fn task_id(&self) -> uuid::Uuid;
118}
119
120#[derive(Clone)]
129pub enum InputMessage
130{
131 StartDelegation
133 {
134 cfp: CFP,
136 },
137}
138
139impl InputMessage
140{
141 pub(crate) fn create_start_delegation(
143 requester_uri: String,
144 task: impl Task,
145 team_uri: Option<String>,
146 ) -> InputMessage
147 {
148 InputMessage::StartDelegation {
149 cfp: CFP {
150 uuid: task.task_id(),
151 requester_uri,
152 team_uri,
153 task_type: task.task_type().into(),
154 task_description: task.to_description(),
155 signature: Vec::<u8>::default(),
156 },
157 }
158 }
159}
160
161#[derive(Clone)]
170pub enum OutputMessage
171{
172 Status
174 {
175 uuid: uuid::Uuid,
177 status: Status,
179 },
180}
181
182pub type ModuleInterface = module::ModuleInterface<InputMessage, OutputMessage>;
190
191module::create_module_private_interface!(ModulePrivateInterface, InputMessage, OutputMessage);
198
199pub(crate) struct Module {}
201
202impl Module
203{
204 pub(crate) async fn start(
206 agent_data: agent::AgentData,
207 module_interfaces: (
208 crate::module::ModuleInterface<InputMessage, OutputMessage>,
209 ModulePrivateInterface,
210 ),
211 transport_interface: transport::ModuleInterface,
212 decision_interface: decision::ModuleInterface,
213 ) -> ()
214 {
215 let (_, module_private_interface) = module_interfaces;
216
217 let mut input_receiver = module_private_interface.input_receiver.activate();
218 let output_sender = module_private_interface.output_sender;
219 let transport_interface = transport_interface;
220
221 let ir_fut = {
222 let agent_data = agent_data.clone();
223 let transport_interface = transport_interface.clone();
224 async move {
225 loop
226 {
227 let msg = input_receiver.recv().await;
228 if let Ok(msg) = msg
229 {
230 match msg
231 {
232 InputMessage::StartDelegation { cfp } =>
233 {
234 let output_sender = output_sender.clone();
235 let transport_input_sender = transport_interface.input_sender();
236 let mut transport_output_receiver = transport_interface.output_receiver();
237 let agent_data_agent_uri = agent_data.agent_uri.to_owned();
238 let delegation = async move {
239 let cfp_uuid = cfp.uuid.to_owned();
240
241 match transport_input_sender
243 .broadcast_direct(transport::InputMessage::SendCFP { cfp })
244 .await
245 {
246 Ok(_) =>
247 {}
248 Err(e) => log::error!("While sending CFP: {:?}", e.to_string()),
249 }
250
251 let mut proposals = Vec::<delegation::Proposal>::new();
253 let start = Instant::now();
254 while start.elapsed().as_secs() < 10
255 {
256 let msg = crate::Runtime::timeout(
257 Duration::from_millis(10 * 1000 - start.elapsed().as_millis() as u64),
258 transport_output_receiver.recv(),
259 )
260 .await;
261 if let Ok(Ok(transport::OutputMessage::ReceivedProposal { proposal })) = msg
262 && proposal.task_uuid == cfp_uuid
263 {
264 proposals.push(proposal);
265 }
266 }
267 log::info!(
268 "Received {} proposals for delegation {:?}",
269 proposals.len(),
270 cfp_uuid
271 );
272
273 proposals.sort_by(|a, b| a.cost.partial_cmp(b.cost.borrow()).unwrap());
275 if proposals.is_empty()
276 {
277 log::error!("No proposals received.");
278 ccutils::log_error!(
279 output_sender
280 .broadcast_direct(OutputMessage::Status {
281 uuid: cfp_uuid,
282 status: Status::NoReceivedProposals,
283 })
284 .await,
285 "sending no received proposals"
286 );
287 return;
288 }
289 else
290 {
291 ccutils::log::log_error!(
292 output_sender
293 .broadcast(OutputMessage::Status {
294 uuid: cfp_uuid.to_owned(),
295 status: Status::ReceivedProposals(proposals.len()),
296 })
297 .await,
298 "sending status"
299 );
300 }
301
302 for p in proposals
304 {
305 ccutils::log::log_error!(
306 transport_input_sender
307 .broadcast(transport::InputMessage::SendProposalAcceptance {
308 acceptance: delegation::transport_messages::Acceptance {
309 agent_uri: p.agent_uri.to_owned(),
310 requester_uri: agent_data_agent_uri.to_owned(),
311 acceptance: true,
312 uuid: p.task_uuid,
313 signature: Default::default(),
314 },
315 })
316 .await,
317 "sending proposal acceptance"
318 );
319 let start = Instant::now();
320 while start.elapsed().as_secs() < 10
321 {
322 let msg = crate::Runtime::timeout(
323 Duration::from_millis(10 * 1000 - start.elapsed().as_millis() as u64),
324 transport_output_receiver.recv(),
325 )
326 .await;
327 if let Ok(Ok(transport::OutputMessage::ReceivedExecutionAccepted {
328 acceptance,
329 })) = msg
330 && acceptance.uuid == cfp_uuid
331 {
332 if acceptance.acceptance
333 {
334 ccutils::log::log_error!(
335 output_sender
336 .broadcast(OutputMessage::Status {
337 uuid: cfp_uuid,
338 status: Status::ProposalAccepted,
339 })
340 .await,
341 "sending proposal accepted"
342 );
343 return;
344 }
345 else
346 {
347 break;
348 }
349 }
350 }
351 ccutils::log::log_error!(
353 transport_input_sender
354 .broadcast(transport::InputMessage::SendCancelAcceptance {
355 cancel_acceptance: transport_messages::CancelAcceptance {
356 agent_uri: p.agent_uri,
357 uuid: p.task_uuid,
358 signature: Default::default(),
359 },
360 })
361 .await,
362 "sending acceptance cancellation"
363 );
364 }
365 ccutils::log::log_error!(
367 output_sender
368 .broadcast(OutputMessage::Status {
369 uuid: cfp_uuid.to_owned(),
370 status: Status::NoAcceptedProposal,
371 })
372 .await,
373 "sending no accepted proposal"
374 );
375 };
376 ccutils::log::log_error!(
377 agent_data.async_runtime.spawn_task(delegation).detach(),
378 "spawning delergation task"
379 );
380 }
381 }
382 }
383 else
384 {
385 return;
386 }
387 }
388 }
389 };
390 let tr_fut = {
391 let agent_data = agent_data.clone();
392 let decision_input_sender = decision_interface.input_sender();
393 let mut transport_output_receiver = transport_interface.output_receiver();
394 async move {
395 loop
396 {
397 match transport_output_receiver.recv().await
398 {
399 Ok(msg) => match msg
400 {
401 transport::OutputMessage::ReceivedCFP { cfp } =>
402 {
403 ccutils::log::log_error!(
404 decision_input_sender
405 .broadcast(decision::InputMessage::DecideCFPAcceptance { cfp })
406 .await,
407 "sending cfp acceptance"
408 );
409 }
410 transport::OutputMessage::ReceivedProposalAccepted { acceptance } =>
411 {
412 if acceptance.agent_uri == agent_data.agent_uri
413 {
414 ccutils::log::log_error!(
415 decision_input_sender
416 .broadcast(decision::InputMessage::QueueExecution {
417 uuid: acceptance.uuid,
418 requester_uri: acceptance.requester_uri,
419 })
420 .await,
421 "sending queue execution"
422 );
423 }
424 }
425 transport::OutputMessage::ReceivedCancelAcceptance { cancel_acceptance } =>
426 {
427 if cancel_acceptance.agent_uri == agent_data.agent_uri
428 {
429 ccutils::log::log_error!(
430 decision_input_sender
431 .broadcast(decision::InputMessage::CancelExecution {
432 uuid: cancel_acceptance.uuid,
433 })
434 .await,
435 "sending cancel execution"
436 );
437 }
438 }
439 _ =>
440 {}
441 },
442 Err(_) =>
443 {
444 return;
445 }
446 }
447 }
448 }
449 };
450 let dr_fut = {
451 let mut decision_output_receiver = decision_interface.output_receiver();
452 let transport_input_sender = transport_interface.input_sender();
453 async move {
454 loop
455 {
456 match decision_output_receiver.recv().await
457 {
458 Ok(msg) => match msg
459 {
460 decision::OutputMessage::CFPProposal { proposal } =>
461 {
462 ccutils::log_error!(
463 transport_input_sender
464 .broadcast(transport::InputMessage::SendProposal { proposal })
465 .await,
466 "sending proposal"
467 );
468 }
469 decision::OutputMessage::QueueExecutionResult {
470 uuid,
471 requester_uri,
472 accepted,
473 } =>
474 {
475 ccutils::log::log_error!(
476 transport_input_sender
477 .broadcast(transport::InputMessage::SendExecutionAcceptance {
478 acceptance: transport_messages::Acceptance {
479 agent_uri: agent_data.agent_uri.to_owned(),
480 requester_uri,
481 acceptance: accepted,
482 uuid,
483 signature: Default::default(),
484 },
485 })
486 .await,
487 "sending execution acceptance"
488 );
489 }
490 },
491 Err(_) =>
492 {
493 return;
494 }
495 }
496 }
497 }
498 };
499 join!(ir_fut, tr_fut, dr_fut);
500 }
501}
502
503impl crate::module::Module for Module
504{
505 type InputMessage = InputMessage;
506 type OutputMessage = OutputMessage;
507 type ModulePrivateInterface = ModulePrivateInterface;
508}