d3_components/
components.rs

1#![allow(dead_code)]
2#[allow(unused_imports)] use super::*;
3
4/// ComponentCmd is the instruction set for components. It provides a means of
5/// starting, and stopping a component. Additionally, it signals when a service
6/// has a new session along with the sender for the coordinator of the session.
7#[derive(Debug, MachineImpl)]
8pub enum ComponentCmd {
9    /// Starts a component, some components don't need to be told to start, others do.
10    /// Start is sent soon after the server is started and is automatic. It notifies
11    /// the component that it can complete any deferred setup and should be in a
12    /// running state.
13    Start,
14    /// Stops a component. Stop is sent as the server is shutting down and
15    /// is automatic.It notifies the component that the server is about to stop
16    /// and that the component should cleanup anything it needs to cleanup before
17    /// the server stops.
18    Stop,
19    /// NewSession announces that there's a new session which other components
20    /// may want to know about. The tupple is a session Uuid, a service type,
21    /// and an anonymous sender. Presumably, the component responding to this
22    /// is able to convert the sender to a sender which it can interact with.
23    NewSession(u128, settings::Service, Arc<dyn std::any::Any + Send + Sync>),
24}
25
26/// Shorthand for an anonymous sender, that can be sent instructions from a specific
27/// instruction set. The following example illustrates how to convert an AnySender
28/// to a ComponentSender.
29/// # Examples
30///
31/// ```
32/// # use std::error::Error;
33/// # use std::sync::Arc;
34/// # use crossbeam;
35/// # use crossbeam::channel::{Sender,Receiver};
36/// # type AnySender = Arc<dyn std::any::Any + Send + Sync>;
37/// # enum ComponentCmd { Start, Stop }
38/// # type ComponentSender = Sender<ComponentCmd>;
39/// # enum StateTable { Start, Stop }
40/// # fn channel<T>() -> (crossbeam::channel::Sender<T>, crossbeam::channel::Receiver<T>) {
41/// #   crossbeam::channel::unbounded()
42/// # }
43/// # fn main() -> Result<(), Box<dyn Error>> {
44/// #
45/// /// Consume `sender` returning either `Some(sender: ComponentSender)` or 'None'.
46/// fn as_component_sender(any_sender: AnySender) -> Option<ComponentSender>
47/// {
48///     if let Ok(sender) = any_sender.downcast::<ComponentSender>() {
49///         // pull out the sender and clone it, as its not copyable
50///         Some((*sender).clone())
51///     } else {
52///         None
53///     }
54/// }
55/// let (sender, receiver) = channel::<ComponentCmd>();
56/// let any_sender: AnySender = Arc::new(sender);
57/// assert_eq!(as_component_sender(any_sender).is_some(), true);
58///
59/// let (sender, receiver) = channel::<StateTable>();
60/// let any_sender: AnySender = Arc::new(sender);
61/// assert_eq!(as_component_sender(any_sender).is_some(), false);
62///
63/// # Ok(())
64/// }
65/// ```
66pub type AnySender = Arc<dyn std::any::Any + Send + Sync>;
67/// Shorthand for a sender, that can be sent ComponentCmd instructions.
68pub type ComponentSender = Sender<ComponentCmd>;
69
70/// ComponentError describes the types of errors that a component may return.
71/// This is most used during configuration.
72#[derive(Debug)]
73pub enum ComponentError {
74    /// Indicates that a component was unable to fully activate itself.
75    NotEnabled(String),
76    /// Indicates that the component was unable to obtain
77    /// enough configuration information to fully activate itself.
78    BadConfig(String),
79}
80
81/// ComponentInfo describes an active component. It provides the component type
82/// and the sender for the component. It is assembled during the config startup phase
83/// and sent to each coordinator.
84#[derive(Debug, Clone)]
85pub struct ComponentInfo {
86    component: settings::Component,
87    sender: ComponentSender,
88}
89impl ComponentInfo {
90    /// Creates and new ComponentInfo struct and returns it.
91    pub const fn new(component: settings::Component, sender: ComponentSender) -> Self { Self { component, sender } }
92    /// Get the component type for this component
93    pub const fn component(&self) -> &settings::Component { &self.component }
94    /// Get a reference to the sender for this component
95    pub const fn sender(&self) -> &ComponentSender { &self.sender }
96}
97
98#[cfg(test)]
99mod tests {
100    #[allow(unused_imports)] use super::*;
101    use simplelog::*;
102
103    use d3_core::executor;
104
105    #[derive(Debug, MachineImpl)]
106    pub enum TestMessage {
107        Test,
108    }
109
110    #[test]
111    fn test_new_session() {
112        // tests that we can have an instruction with any sender as:
113        // Arc<dyn std::any::Any + Send + Sync> and convert back to
114        // the correct sender. This is critical as we use the service
115        // to indicate the type of sender, and only components which
116        // understand the service should attempt to participate by
117        // decoding the sender and responding appropriately.
118        #[derive(Default)]
119        struct Controller {
120            counter: AtomicCell<usize>,
121        }
122        impl Machine<ComponentCmd> for Controller {
123            fn receive(&self, cmd: ComponentCmd) {
124                match cmd {
125                    ComponentCmd::NewSession(conn_id, service, sender) => {
126                        assert_eq!(conn_id, 12345);
127                        assert_eq!(service, "EchoServer".to_string());
128                        assert_eq!(false, Arc::clone(&sender).downcast::<Sender<TestMessage>>().is_ok());
129                        assert_eq!(true, Arc::clone(&sender).downcast::<Sender<ComponentCmd>>().is_ok());
130                        self.counter.store(1);
131                    },
132                    _ => assert_eq!(true, false),
133                }
134            }
135        }
136        impl Machine<TestMessage> for Controller {
137            fn receive(&self, _cmd: TestMessage) {
138                assert_eq!(true, false);
139            }
140        }
141        // install a simple logger
142        CombinedLogger::init(vec![TermLogger::new(LevelFilter::Error, Config::default(), TerminalMode::Mixed)]).unwrap();
143        executor::start_server();
144        thread::sleep(std::time::Duration::from_millis(20));
145        let (m, component_sender) = executor::connect::<_, ComponentCmd>(Controller::default());
146        let _test_sender = executor::and_connect::<_, TestMessage>(&m);
147
148        if component_sender
149            .send(ComponentCmd::NewSession(
150                12345,
151                "EchoServer".to_string(),
152                Arc::new(component_sender.clone()),
153            ))
154            .is_err()
155        {}
156        thread::sleep(std::time::Duration::from_millis(50));
157        assert_eq!(m.lock().counter.load(), 1);
158        executor::stop_server();
159    }
160}