1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#![allow(dead_code)]
#[allow(unused_imports)] use super::*;

/// ComponentCmd is the instruction set for components. It provides a means of
/// starting, and stopping a component. Additionally, it signals when a service
/// has a new session along with the sender for the coordinator of the session.
#[derive(Debug, MachineImpl)]
pub enum ComponentCmd {
    /// Starts a component, some components don't need to be told to start, others do.
    /// Start is sent soon after the server is started and is automatic. It notifies
    /// the component that it can complete any deferred setup and should be in a
    /// running state.
    Start,
    /// Stops a component. Stop is sent as the server is shutting down and
    /// is automatic.It notifies the component that the server is about to stop
    /// and that the component should cleanup anything it needs to cleanup before
    /// the server stops.
    Stop,
    /// NewSession announces that there's a new session which other components
    /// may want to know about. The tupple is a session Uuid, a service type,
    /// and an anonymous sender. Presumably, the component responding to this
    /// is able to convert the sender to a sender which it can interact with.
    NewSession(u128, settings::Service, Arc<dyn std::any::Any + Send + Sync>),
}

/// Shorthand for an anonymous sender, that can be sent instructions from a specific
/// instruction set. The following example illustrates how to convert an AnySender
/// to a ComponentSender.
/// # Examples
///
/// ```
/// # use std::error::Error;
/// # use std::sync::Arc;
/// # use crossbeam;
/// # use crossbeam::channel::{Sender,Receiver};
/// # type AnySender = Arc<dyn std::any::Any + Send + Sync>;
/// # enum ComponentCmd { Start, Stop }
/// # type ComponentSender = Sender<ComponentCmd>;
/// # enum StateTable { Start, Stop }
/// # fn channel<T>() -> (crossbeam::channel::Sender<T>, crossbeam::channel::Receiver<T>) {
/// #   crossbeam::channel::unbounded()
/// # }
/// # fn main() -> Result<(), Box<dyn Error>> {
/// #
/// /// Consume `sender` returning either `Some(sender: ComponentSender)` or 'None'.
/// fn as_component_sender(any_sender: AnySender) -> Option<ComponentSender>
/// {
///     if let Ok(sender) = any_sender.downcast::<ComponentSender>() {
///         // pull out the sender and clone it, as its not copyable
///         Some((*sender).clone())
///     } else {
///         None
///     }
/// }
/// let (sender, receiver) = channel::<ComponentCmd>();
/// let any_sender: AnySender = Arc::new(sender);
/// assert_eq!(as_component_sender(any_sender).is_some(), true);
///
/// let (sender, receiver) = channel::<StateTable>();
/// let any_sender: AnySender = Arc::new(sender);
/// assert_eq!(as_component_sender(any_sender).is_some(), false);
///
/// # Ok(())
/// }
/// ```
pub type AnySender = Arc<dyn std::any::Any + Send + Sync>;
/// Shorthand for a sender, that can be sent ComponentCmd instructions.
pub type ComponentSender = Sender<ComponentCmd>;

/// ComponentError describes the types of errors that a component may return.
/// This is most used during configuration.
#[derive(Debug)]
pub enum ComponentError {
    /// Indicates that a component was unable to fully activate itself.
    NotEnabled(String),
    /// Indicates that the component was unable to obtain
    /// enough configuration information to fully activate itself.
    BadConfig(String),
}

/// ComponentInfo describes an active component. It provides the component type
/// and the sender for the component. It is assembled during the config startup phase
/// and sent to each coordinator.
#[derive(Debug, Clone)]
pub struct ComponentInfo {
    component: settings::Component,
    sender: ComponentSender,
}
impl ComponentInfo {
    /// Creates and new ComponentInfo struct and returns it.
    pub const fn new(component: settings::Component, sender: ComponentSender) -> Self { Self { component, sender } }
    /// Get the component type for this component
    pub const fn component(&self) -> settings::Component { self.component }
    /// Get a reference to the sender for this component
    pub const fn sender(&self) -> &ComponentSender { &self.sender }
}

#[cfg(test)]
mod tests {
    #[allow(unused_imports)] use super::*;
    use simplelog::*;

    use crate::settings::Service;
    use d3_core::executor;

    #[derive(Debug, MachineImpl)]
    pub enum TestMessage {
        Test,
    }

    #[test]
    fn test_new_session() {
        // tests that we can have an instruction with any sender as:
        // Arc<dyn std::any::Any + Send + Sync> and convert back to
        // the correct sender. This is critical as we use the service
        // to indicate the type of sender, and only components which
        // understand the service should attempt to participate by
        // decoding the sender and responding appropriately.
        #[derive(Default)]
        struct Controller {
            counter: AtomicCell<usize>,
        }
        impl Machine<ComponentCmd> for Controller {
            fn receive(&self, cmd: ComponentCmd) {
                println!("recv");
                match cmd {
                    ComponentCmd::NewSession(conn_id, service, sender) => {
                        assert_eq!(conn_id, 12345);
                        assert_eq!(service, Service::EchoServer);
                        assert_eq!(false, Arc::clone(&sender).downcast::<Sender<TestMessage>>().is_ok());
                        assert_eq!(true, Arc::clone(&sender).downcast::<Sender<ComponentCmd>>().is_ok());
                        self.counter.store(1);
                    },
                    _ => assert_eq!(true, false),
                }
            }
        }
        impl Machine<TestMessage> for Controller {
            fn receive(&self, _cmd: TestMessage) {
                assert_eq!(true, false);
            }
        }
        // install a simple logger
        CombinedLogger::init(vec![TermLogger::new(
            LevelFilter::Error,
            Config::default(),
            TerminalMode::Mixed,
        )])
        .unwrap();
        // tweaks for more responsive testing
        executor::set_selector_maintenance_duration(std::time::Duration::from_millis(20));

        executor::start_server();
        thread::sleep(std::time::Duration::from_millis(20));
        let (m, component_sender) = executor::connect::<_, ComponentCmd>(Controller::default());
        let _test_sender = executor::and_connect::<_, TestMessage>(&m);

        if component_sender
            .send(ComponentCmd::NewSession(
                12345,
                Service::EchoServer,
                Arc::new(component_sender.clone()),
            ))
            .is_err()
        {}
        thread::sleep(std::time::Duration::from_millis(50));
        assert_eq!(m.lock().unwrap().counter.load(), 1);
        executor::stop_server();
    }
}