1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//! This module contains the definition for an agent_tk module.

use crate::delegation::{InputMessage, OutputMessage};

/// Define an agent module. Each module can receive message as inputs and send via its outputs. All the messages are broadcasted. Each module has a single thread used for handling the messages. If longer computation are needed, they can spawn other modules.
pub trait Module
{
  type InputMessage;
  type OutputMessage;
  type ModulePrivateInterface: ModulePrivateInterface<Self::InputMessage, Self::OutputMessage>;

  /// This function is called ahead
  fn prepare_interfaces(
    cap: usize,
  ) -> (
    ModuleInterface<Self::InputMessage, Self::OutputMessage>,
    Self::ModulePrivateInterface,
  )
  {
    let (input_sender, mut input_receiver) = async_broadcast::broadcast::<Self::InputMessage>(cap);
    let (output_sender, mut output_receiver) =
      async_broadcast::broadcast::<Self::OutputMessage>(cap);

    (
      ModuleInterface::<Self::InputMessage, Self::OutputMessage>::new(
        input_sender,
        output_receiver.deactivate(),
      ),
      ModulePrivateInterface::new(input_receiver.deactivate(), output_sender),
    )
  }
}

pub trait ModulePrivateInterface<InputMessage, OutputMessage>
{
  fn new(
    input_receiver: async_broadcast::InactiveReceiver<InputMessage>,
    output_sender: async_broadcast::Sender<OutputMessage>,
  ) -> Self;
}

#[derive(Clone)]
pub struct ModuleInterface<InputMessage, OutputMessage>
{
  input_sender: async_broadcast::Sender<InputMessage>,
  output_receiver: async_broadcast::InactiveReceiver<OutputMessage>,
}

impl<InputMessage, OutputMessage> ModuleInterface<InputMessage, OutputMessage>
{
  fn new(
    input_sender: async_broadcast::Sender<InputMessage>,
    output_receiver: async_broadcast::InactiveReceiver<OutputMessage>,
  ) -> Self
  {
    Self {
      output_receiver,
      input_sender,
    }
  }

  pub(crate) fn output_receiver(&self) -> async_broadcast::Receiver<OutputMessage>
  {
    self.output_receiver.activate_cloned()
  }
  pub(crate) fn input_sender(&self) -> async_broadcast::Sender<InputMessage>
  {
    self.input_sender.clone()
  }
}

macro_rules! create_module_private_interface {
  ($name:ident, $input_message:ty, $output_message:ty) => {
    #[allow(missing_docs)]
    pub struct $name
    {
      input_receiver: async_broadcast::InactiveReceiver<$input_message>,
      output_sender: async_broadcast::Sender<$output_message>,
    }
    impl $crate::module::ModulePrivateInterface<$input_message, $output_message> for $name
    {
      fn new(
        input_receiver: async_broadcast::InactiveReceiver<$input_message>,
        output_sender: async_broadcast::Sender<$output_message>,
      ) -> Self
      {
        Self {
          input_receiver,
          output_sender,
        }
      }
    }
  };
}

pub(crate) use create_module_private_interface;