1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
#![allow(dead_code)] #[allow(unused_imports)] use super::*; /// ComponentCmd is the instruction set for components. It provides a means of /// starting, and stopping a component. Additionally, it signals when a service /// has a new session along with the sender for the coordinator of the session. #[derive(Debug, MachineImpl)] pub enum ComponentCmd { /// Starts a component, some components don't need to be told to start, others do. /// Start is sent soon after the server is started and is automatic. It notifies /// the component that it can complete any deferred setup and should be in a /// running state. Start, /// Stops a component. Stop is sent as the server is shutting down and /// is automatic.It notifies the component that the server is about to stop /// and that the component should cleanup anything it needs to cleanup before /// the server stops. Stop, /// NewSession announces that there's a new session which other components /// may want to know about. The tupple is a session Uuid, a service type, /// and an anonymous sender. Presumably, the component responding to this /// is able to convert the sender to a sender which it can interact with. NewSession(u128, settings::Service, Arc<dyn std::any::Any + Send + Sync>), } /// Shorthand for an anonymous sender, that can be sent instructions from a specific /// instruction set. The following example illustrates how to convert an AnySender /// to a ComponentSender. /// # Examples /// /// ``` /// # use std::error::Error; /// # use std::sync::Arc; /// # use crossbeam; /// # use crossbeam::channel::{Sender,Receiver}; /// # type AnySender = Arc<dyn std::any::Any + Send + Sync>; /// # enum ComponentCmd { Start, Stop } /// # type ComponentSender = Sender<ComponentCmd>; /// # enum StateTable { Start, Stop } /// # fn channel<T>() -> (crossbeam::channel::Sender<T>, crossbeam::channel::Receiver<T>) { /// # crossbeam::channel::unbounded() /// # } /// # fn main() -> Result<(), Box<dyn Error>> { /// # /// /// Consume `sender` returning either `Some(sender: ComponentSender)` or 'None'. /// fn as_component_sender(any_sender: AnySender) -> Option<ComponentSender> /// { /// if let Ok(sender) = any_sender.downcast::<ComponentSender>() { /// // pull out the sender and clone it, as its not copyable /// Some((*sender).clone()) /// } else { /// None /// } /// } /// let (sender, receiver) = channel::<ComponentCmd>(); /// let any_sender: AnySender = Arc::new(sender); /// assert_eq!(as_component_sender(any_sender).is_some(), true); /// /// let (sender, receiver) = channel::<StateTable>(); /// let any_sender: AnySender = Arc::new(sender); /// assert_eq!(as_component_sender(any_sender).is_some(), false); /// /// # Ok(()) /// } /// ``` pub type AnySender = Arc<dyn std::any::Any + Send + Sync>; /// Shorthand for a sender, that can be sent ComponentCmd instructions. pub type ComponentSender = Sender<ComponentCmd>; /// ComponentError describes the types of errors that a component may return. /// This is most used during configuration. #[derive(Debug)] pub enum ComponentError { /// Indicates that a component was unable to fully activate itself. NotEnabled(String), /// Indicates that the component was unable to obtain /// enough configuration information to fully activate itself. BadConfig(String), } /// ComponentInfo describes an active component. It provides the component type /// and the sender for the component. It is assembled during the config startup phase /// and sent to each coordinator. #[derive(Debug, Clone)] pub struct ComponentInfo { component: settings::Component, sender: ComponentSender, } impl ComponentInfo { /// Creates and new ComponentInfo struct and returns it. pub const fn new(component: settings::Component, sender: ComponentSender) -> Self { Self { component, sender } } /// Get the component type for this component pub const fn component(&self) -> settings::Component { self.component } /// Get a reference to the sender for this component pub const fn sender(&self) -> &ComponentSender { &self.sender } } #[cfg(test)] mod tests { #[allow(unused_imports)] use super::*; use simplelog::*; use crate::settings::Service; use d3_core::executor; #[derive(Debug, MachineImpl)] pub enum TestMessage { Test, } #[test] fn test_new_session() { // tests that we can have an instruction with any sender as: // Arc<dyn std::any::Any + Send + Sync> and convert back to // the correct sender. This is critical as we use the service // to indicate the type of sender, and only components which // understand the service should attempt to participate by // decoding the sender and responding appropriately. #[derive(Default)] struct Controller { counter: AtomicCell<usize>, } impl Machine<ComponentCmd> for Controller { fn receive(&self, cmd: ComponentCmd) { println!("recv"); match cmd { ComponentCmd::NewSession(conn_id, service, sender) => { assert_eq!(conn_id, 12345); assert_eq!(service, Service::EchoServer); assert_eq!(false, Arc::clone(&sender).downcast::<Sender<TestMessage>>().is_ok()); assert_eq!(true, Arc::clone(&sender).downcast::<Sender<ComponentCmd>>().is_ok()); self.counter.store(1); }, _ => assert_eq!(true, false), } } } impl Machine<TestMessage> for Controller { fn receive(&self, _cmd: TestMessage) { assert_eq!(true, false); } } // install a simple logger CombinedLogger::init(vec![TermLogger::new( LevelFilter::Error, Config::default(), TerminalMode::Mixed, )]) .unwrap(); // tweaks for more responsive testing executor::set_selector_maintenance_duration(std::time::Duration::from_millis(20)); executor::start_server(); thread::sleep(std::time::Duration::from_millis(20)); let (m, component_sender) = executor::connect::<_, ComponentCmd>(Controller::default()); let _test_sender = executor::and_connect::<_, TestMessage>(&m); if component_sender .send(ComponentCmd::NewSession( 12345, Service::EchoServer, Arc::new(component_sender.clone()), )) .is_err() {} thread::sleep(std::time::Duration::from_millis(50)); assert_eq!(m.lock().unwrap().counter.load(), 1); executor::stop_server(); } }