d3_components/components.rs
1#![allow(dead_code)]
2#[allow(unused_imports)] use super::*;
3
4/// ComponentCmd is the instruction set for components. It provides a means of
5/// starting, and stopping a component. Additionally, it signals when a service
6/// has a new session along with the sender for the coordinator of the session.
7#[derive(Debug, MachineImpl)]
8pub enum ComponentCmd {
9 /// Starts a component, some components don't need to be told to start, others do.
10 /// Start is sent soon after the server is started and is automatic. It notifies
11 /// the component that it can complete any deferred setup and should be in a
12 /// running state.
13 Start,
14 /// Stops a component. Stop is sent as the server is shutting down and
15 /// is automatic.It notifies the component that the server is about to stop
16 /// and that the component should cleanup anything it needs to cleanup before
17 /// the server stops.
18 Stop,
19 /// NewSession announces that there's a new session which other components
20 /// may want to know about. The tupple is a session Uuid, a service type,
21 /// and an anonymous sender. Presumably, the component responding to this
22 /// is able to convert the sender to a sender which it can interact with.
23 NewSession(u128, settings::Service, Arc<dyn std::any::Any + Send + Sync>),
24}
25
26/// Shorthand for an anonymous sender, that can be sent instructions from a specific
27/// instruction set. The following example illustrates how to convert an AnySender
28/// to a ComponentSender.
29/// # Examples
30///
31/// ```
32/// # use std::error::Error;
33/// # use std::sync::Arc;
34/// # use crossbeam;
35/// # use crossbeam::channel::{Sender,Receiver};
36/// # type AnySender = Arc<dyn std::any::Any + Send + Sync>;
37/// # enum ComponentCmd { Start, Stop }
38/// # type ComponentSender = Sender<ComponentCmd>;
39/// # enum StateTable { Start, Stop }
40/// # fn channel<T>() -> (crossbeam::channel::Sender<T>, crossbeam::channel::Receiver<T>) {
41/// # crossbeam::channel::unbounded()
42/// # }
43/// # fn main() -> Result<(), Box<dyn Error>> {
44/// #
45/// /// Consume `sender` returning either `Some(sender: ComponentSender)` or 'None'.
46/// fn as_component_sender(any_sender: AnySender) -> Option<ComponentSender>
47/// {
48/// if let Ok(sender) = any_sender.downcast::<ComponentSender>() {
49/// // pull out the sender and clone it, as its not copyable
50/// Some((*sender).clone())
51/// } else {
52/// None
53/// }
54/// }
55/// let (sender, receiver) = channel::<ComponentCmd>();
56/// let any_sender: AnySender = Arc::new(sender);
57/// assert_eq!(as_component_sender(any_sender).is_some(), true);
58///
59/// let (sender, receiver) = channel::<StateTable>();
60/// let any_sender: AnySender = Arc::new(sender);
61/// assert_eq!(as_component_sender(any_sender).is_some(), false);
62///
63/// # Ok(())
64/// }
65/// ```
66pub type AnySender = Arc<dyn std::any::Any + Send + Sync>;
67/// Shorthand for a sender, that can be sent ComponentCmd instructions.
68pub type ComponentSender = Sender<ComponentCmd>;
69
70/// ComponentError describes the types of errors that a component may return.
71/// This is most used during configuration.
72#[derive(Debug)]
73pub enum ComponentError {
74 /// Indicates that a component was unable to fully activate itself.
75 NotEnabled(String),
76 /// Indicates that the component was unable to obtain
77 /// enough configuration information to fully activate itself.
78 BadConfig(String),
79}
80
81/// ComponentInfo describes an active component. It provides the component type
82/// and the sender for the component. It is assembled during the config startup phase
83/// and sent to each coordinator.
84#[derive(Debug, Clone)]
85pub struct ComponentInfo {
86 component: settings::Component,
87 sender: ComponentSender,
88}
89impl ComponentInfo {
90 /// Creates and new ComponentInfo struct and returns it.
91 pub const fn new(component: settings::Component, sender: ComponentSender) -> Self { Self { component, sender } }
92 /// Get the component type for this component
93 pub const fn component(&self) -> &settings::Component { &self.component }
94 /// Get a reference to the sender for this component
95 pub const fn sender(&self) -> &ComponentSender { &self.sender }
96}
97
98#[cfg(test)]
99mod tests {
100 #[allow(unused_imports)] use super::*;
101 use simplelog::*;
102
103 use d3_core::executor;
104
105 #[derive(Debug, MachineImpl)]
106 pub enum TestMessage {
107 Test,
108 }
109
110 #[test]
111 fn test_new_session() {
112 // tests that we can have an instruction with any sender as:
113 // Arc<dyn std::any::Any + Send + Sync> and convert back to
114 // the correct sender. This is critical as we use the service
115 // to indicate the type of sender, and only components which
116 // understand the service should attempt to participate by
117 // decoding the sender and responding appropriately.
118 #[derive(Default)]
119 struct Controller {
120 counter: AtomicCell<usize>,
121 }
122 impl Machine<ComponentCmd> for Controller {
123 fn receive(&self, cmd: ComponentCmd) {
124 match cmd {
125 ComponentCmd::NewSession(conn_id, service, sender) => {
126 assert_eq!(conn_id, 12345);
127 assert_eq!(service, "EchoServer".to_string());
128 assert_eq!(false, Arc::clone(&sender).downcast::<Sender<TestMessage>>().is_ok());
129 assert_eq!(true, Arc::clone(&sender).downcast::<Sender<ComponentCmd>>().is_ok());
130 self.counter.store(1);
131 },
132 _ => assert_eq!(true, false),
133 }
134 }
135 }
136 impl Machine<TestMessage> for Controller {
137 fn receive(&self, _cmd: TestMessage) {
138 assert_eq!(true, false);
139 }
140 }
141 // install a simple logger
142 CombinedLogger::init(vec![TermLogger::new(LevelFilter::Error, Config::default(), TerminalMode::Mixed)]).unwrap();
143 executor::start_server();
144 thread::sleep(std::time::Duration::from_millis(20));
145 let (m, component_sender) = executor::connect::<_, ComponentCmd>(Controller::default());
146 let _test_sender = executor::and_connect::<_, TestMessage>(&m);
147
148 if component_sender
149 .send(ComponentCmd::NewSession(
150 12345,
151 "EchoServer".to_string(),
152 Arc::new(component_sender.clone()),
153 ))
154 .is_err()
155 {}
156 thread::sleep(std::time::Duration::from_millis(50));
157 assert_eq!(m.lock().counter.load(), 1);
158 executor::stop_server();
159 }
160}