event_engine/
lib.rs

1/// # Event Engine
2/// This library provides a framework for writing event-based applications that utilize a plugin architecture.
3/// Applications built with `event-engine` are written in Rust but can utilize plugins in written in multiple languages.
4///
5/// Events correspond to statically typed messages that can be transmitted over a socket and are the basic
6/// mechanism of communication between plugins of the application. Each application defines its own event types
7/// which include mechanisms for serializing and deserializing a message to/from bytes. No assumption is made
8/// about the serialization format used for events, and different event types can utilize different formats.
9///
10/// Plugins are independent components of an application that create and consume events. Each plugin defines
11/// the event types it is interested in, and the core `event-engine` proxies events across the plugins of an
12/// application using a publish-subscribe pattern. Each application defines its own plugins which can either be
13/// internal or external. Internal plugins run as child threads within the main (Rust) application process and
14/// are necessarily written in Rust. External plugins run as separate OS processes and can be written in any
15/// language.
16///
17use std::{
18    sync::Arc,
19    thread::{self, JoinHandle},
20};
21
22use errors::EngineError;
23use log::{info, debug};
24use plugins::{ExternalPlugin, Plugin};
25use uuid::Uuid;
26use zmq::{Context, Socket};
27
28pub mod errors;
29pub mod events;
30pub mod plugins;
31
32/// Configuration for the `event-engine` application.
33pub struct AppConfig {
34    pub publish_port: i32,
35    pub subscribe_port: i32,
36}
37
38// configuration data related to creating the sockets used by the engine.
39struct SocketData {
40    pub_socket_inproc_url: String,
41    sub_socket_inproc_url: String,
42    sync_socket_port: i32,
43    sync_inproc_url: String,
44}
45
46impl Default for SocketData {
47    fn default() -> Self {
48        Self {
49            pub_socket_inproc_url: "inproc://messages".to_string(),
50            sub_socket_inproc_url: "inproc://events".to_string(),
51            sync_socket_port: 5000,
52            sync_inproc_url: "inproc://sync".to_string(),
53        }
54    }
55}
56
57/// The `event-engine` application object.
58pub struct App {
59    pub plugins: Vec<Arc<Box<dyn Plugin>>>,
60    pub external_plugins: Vec<Arc<Box<dyn ExternalPlugin>>>,
61    pub app_config: AppConfig,
62    pub context: Context,
63}
64
65impl Default for App {
66    fn default() -> Self {
67        App {
68            plugins: vec![],
69            external_plugins: vec![],
70            app_config: AppConfig {
71                publish_port: 5559,
72                subscribe_port: 5560,
73            },
74            context: Context::new(),
75        }
76    }
77}
78
79impl App {
80    pub fn new(publish_port: i32, subscribe_port: i32) -> Self {
81        let app_config = AppConfig {
82            publish_port,
83            subscribe_port,
84        };
85        App {
86            app_config,
87            ..Default::default()
88        }
89    }
90
91    /// create the zmq socket to be used for 'outgoing' events; i.e., the events the plugins will receive
92    /// on the subscriptions socket
93    fn get_outgoing_socket(&self) -> Result<Socket, EngineError> {
94        let socket_data = SocketData::default();
95        let sub_socket_tcp_url = format!("tcp://*:{}", self.app_config.subscribe_port);
96        // from the engine's perspective, this socket is a PUB
97        let outgoing = self.context.socket(zmq::PUB)?;
98        // bind the socket to the TCP port
99        outgoing.bind(&sub_socket_tcp_url).map_err(|e| {
100            EngineError::SubSocketTCPBindError(self.app_config.subscribe_port.to_string(), e)
101        })?;
102        // bind the socket to the inproc URL
103        outgoing
104            .bind(&socket_data.sub_socket_inproc_url)
105            .map_err(|_e| {
106                EngineError::SubSocketInProcBindError(socket_data.sub_socket_inproc_url.to_string())
107            })?;
108        Ok(outgoing)
109    }
110
111    /// create the zmq socket to be used for 'incoming' events; i.e., events published by the plugins
112    fn get_incoming_socket(&self) -> Result<Socket, EngineError> {
113        let socket_data = SocketData::default();
114        let pub_socket_tcp_url = format!("tcp://*:{}", self.app_config.publish_port);
115        let incoming = self.context.socket(zmq::SUB)?;
116        // bind the socket to the TCP port
117        incoming.bind(&pub_socket_tcp_url).map_err(|_e| {
118            EngineError::PubSocketTCPBindError(self.app_config.subscribe_port.to_string())
119        })?;
120        // bind the socket to the inproc URL
121        incoming
122            .bind(&socket_data.pub_socket_inproc_url)
123            .map_err(|_e| {
124                EngineError::PubSocketInProcBindError(socket_data.pub_socket_inproc_url.to_string())
125            })?;
126        // subscribe to all events
127        let filter = String::new();
128        incoming
129            .set_subscribe(filter.as_bytes())
130            .map_err(EngineError::EngineSetSubFilterAllError)?;
131        Ok(incoming)
132    }
133
134    /// start a plugin. this function does the following, from within a new thread:
135    ///   1) creates the publish and subscribe socket objects that the plugin will use for events.
136    ///   2) configures the plugin's sub socket with the plugin's subscriptions
137    ///   3) creates the sync socket that the plugin will use to sync with the engine
138    ///   4) clones the socket objects, starts a thread, and moves ownership of the cloned objects into the thread.
139    ///   5) sends the 'ready' message on the sync socket.
140    ///   6) waits for the engine's reply on the sync socket.
141    ///   7) executes the plugin's start() function.
142    fn start_plugin(
143        &self,
144        context: &zmq::Context,
145        plugin: Arc<Box<dyn Plugin>>,
146    ) -> Result<JoinHandle<()>, EngineError> {
147        let socket_data = SocketData::default();
148
149        // we need to clone the zmq context so that the thread can take ownership and avoid static lifetime
150        // requirements on the original context object. note also that creating a brand new context within the
151        // thread does not work -- the inproc endpoints will not be shared.
152        let context_clone = context.clone();
153        let plugin_name = plugin.get_name();
154        let plugin_id = plugin.get_id();
155        let builder = thread::Builder::new().name(plugin.get_name());
156        let thread_handle = builder.spawn(move || {
157            debug!("plugin ({}, {}) thread started.", plugin.get_name(), plugin.get_id());
158
159            // TODO -- in the blocked code that follows, we use a series of .unwrap()s to effectively crash
160            // the entire thread when we encounter an EngineError. Using the ? operator to "bubble" up the errors
161            // naturally does not work. We should explore alternatives though
162
163            // -------------------------------------------------------------------------------------------------
164
165            // Create the socket that plugin will use to publish new events
166            let pub_socket = context_clone
167                .socket(zmq::PUB)
168                .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
169                .unwrap();
170            pub_socket
171                .connect(&socket_data.pub_socket_inproc_url)
172                .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
173                .unwrap();
174            debug!("plugin {} connected to pub socket.", plugin.get_id());
175
176            // Create the socket that plugin will use to subscribe to events
177            let sub_socket = context_clone
178                .socket(zmq::SUB)
179                .map_err(|_e| EngineError::PluginSubSocketError(plugin.get_id()))
180                .unwrap();
181            sub_socket
182                .connect(&socket_data.sub_socket_inproc_url)
183                .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
184                .unwrap();
185
186            // Subscribe only to events of interest for this plugin
187            for sub in plugin.get_subscriptions().unwrap() {
188                let filter = sub
189                    .get_filter()
190                    .map_err(|_e| EngineError::EngineSetSubFilterError())
191                    .unwrap();
192                debug!(
193                    "Engine setting subscription filter {:?} for plugin: {}",
194                    filter,
195                    plugin.get_id()
196                );
197                // TODO -- the following error handling doesn't work; compiler complains, "cannot return value referencing
198                // local data"; that is, we cannot return the EngineError..
199                // let filter = sub.get_filter()?;
200                sub_socket
201                    .set_subscribe(&filter)
202                    .map_err(|_e| {
203                        EngineError::PluginSubscriptionError(sub.get_name(), plugin.get_id())
204                    })
205                    .unwrap();
206            }
207            debug!(
208                "plugin {} connected to sub socket with subscriptions set.",
209                plugin.get_id()
210            );
211
212            // Create the sync socket that plugin will use to sync with engine and other plugins
213            let sync = context_clone
214                .socket(zmq::REQ)
215                .map_err(|_e| {
216                    EngineError::PluginSyncSocketError(
217                        plugin.get_id(),
218                        socket_data.sync_socket_port,
219                    )
220                })
221                .unwrap();
222            // connect the sync socket to the inproc URL.
223            // the URL must be different for EACH plugin. this is because the engine receives ALL sync ready messages
224            // from all plugins before sending any replies, and the zmq REQ-REP sockets only allow one request per
225            // reply.
226            // NOTE: since this sync object is for use in the plugin (running in a thread), it will always
227            // be used via inproc (threads use inproc).. therefore, we do not connect to the TCP URL.
228            let plugin_sync_socket_inproc_url =
229                format!("{}-{}", &socket_data.sync_inproc_url, plugin.get_id());
230
231            sync.connect(&plugin_sync_socket_inproc_url)
232                .map_err(|_e| {
233                    EngineError::PluginSyncSocketError(
234                        plugin.get_id(),
235                        socket_data.sync_socket_port,
236                    )
237                })
238                .unwrap();
239            debug!(
240                "plugin {} connected to sync socket at URL: {}.",
241                plugin.get_id(),
242                plugin_sync_socket_inproc_url
243            );
244
245            // connect to and send sync message on sync socket
246            let msg = "ready";
247            sync.send(msg, 0)
248                .expect("Could not send ready message on thread for plugin; crashing!");
249            debug!("plugin {} sent ready message.", plugin.get_id());
250
251            // TODO -- couldn't get this error handling to work...
252            // .map_err(|_e| EngineError::PluginSyncSendError(plugin.get_id()))?;
253
254            // blocking call to wait for reply from engine
255            let _msg = sync
256                .recv_msg(0)
257                .expect("plugin got error trying to receive sync reply; crashing!");
258
259            debug!(
260                "plugin {} received reply from ready message. Executing start function...",
261                plugin.get_id()
262            );
263
264            // now execute the actual plugin function
265            plugin.start(pub_socket, sub_socket).unwrap();
266        }).expect(&format!("Could not spawn new thread for plugin ({},{})", plugin_name, plugin_id));
267        Ok(thread_handle)
268    }
269
270    fn start_external_plugin(
271        &self,
272        context: &zmq::Context,
273        plugin: Arc<Box<dyn ExternalPlugin>>,
274    ) -> Result<JoinHandle<()>, EngineError> {
275        let socket_data = SocketData::default();
276
277        // we need to clone the zmq context so that the thread can take ownership and avoid static lifetime
278        // requirements on the original context object. note also that creating a brand new context within the
279        // thread does not work -- the inproc endpoints will not be shared.
280        let context_clone = context.clone();
281        let plugin_name = plugin.get_name();
282        let plugin_id = plugin.get_id();
283        let builder = thread::Builder::new().name(plugin.get_name());
284        let thread_handle = builder.spawn(move || {
285            debug!("external plugin ({}, {}) thread started.", plugin.get_name(), plugin.get_id());
286
287            // TODO -- in the blocked code that follows, we use a series of .unwrap()s to effectively crash
288            // the entire thread when we encounter an EngineError. Using the ? operator to "bubble" up the errors
289            // naturally does not work. We should explore alternatives though
290
291            // -------------------------------------------------------------------------------------------------
292
293            // Create the external socket used to communicate with the external plugin process
294            let external_socket = context_clone
295                .socket(zmq::REP)
296                .map_err(|e| EngineError::PluginExternalSocketError(plugin.get_id(), e))
297                .unwrap();
298            let external_tcp_url = format!("tcp://*:{}", plugin.get_tcp_port());
299
300            external_socket
301                .bind(&external_tcp_url)
302                .map_err(|e| EngineError::PluginExternalSocketError(plugin.get_id(), e))
303                .unwrap();
304            debug!("plugin bound to external TCP socket: {}", external_tcp_url);
305
306            // Create the socket that plugin will use to publish new events
307            let pub_socket = context_clone
308                .socket(zmq::PUB)
309                .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
310                .unwrap();
311            pub_socket
312                .connect(&socket_data.pub_socket_inproc_url)
313                .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
314                .unwrap();
315            debug!("plugin {} connected to pub socket.", plugin.get_id());
316
317            // Create the socket that plugin will use to subscribe to events
318            let sub_socket = context_clone
319                .socket(zmq::SUB)
320                .map_err(|_e| EngineError::PluginSubSocketError(plugin.get_id()))
321                .unwrap();
322            sub_socket
323                .connect(&socket_data.sub_socket_inproc_url)
324                .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
325                .unwrap();
326
327            // Subscribe only to events of interest for this plugin
328            for sub in plugin.get_subscriptions().unwrap() {
329                let filter = sub
330                    .get_filter()
331                    .map_err(|_e| EngineError::EngineSetSubFilterError())
332                    .unwrap();
333                debug!("Engine setting subscription filter {:?}", filter);
334                // TODO -- the following error handling doesn't work; compiler complains, "cannot return value referencing
335                // local data"; that is, we cannot return the EngineError..
336                // let filter = sub.get_filter()?;
337                sub_socket
338                    .set_subscribe(&filter)
339                    .map_err(|_e| {
340                        EngineError::PluginSubscriptionError(sub.get_name(), plugin.get_id())
341                    })
342                    .unwrap();
343            }
344            debug!(
345                "external plugin {} connected to sub socket with subscriptions set.",
346                plugin.get_id()
347            );
348
349            // Create the sync socket that plugin will use to sync with engine and other plugins
350            let sync = context_clone
351                .socket(zmq::REQ)
352                .map_err(|_e| {
353                    EngineError::PluginSyncSocketError(
354                        plugin.get_id(),
355                        socket_data.sync_socket_port,
356                    )
357                })
358                .unwrap();
359            // connect the sync socket to the inproc URL.
360            // the URL must be different for EACH plugin. this is because the engine receives ALL sync ready messages
361            // from all plugins before sending any replies, and the zmq REQ-REP sockets only allow one request per
362            // reply.
363            // NOTE: since this sync object is for use in the plugin (running in a thread), it will always
364            // be used via inproc (threads use inproc).. therefore, we do not connect to the TCP URL.
365            let plugin_sync_socket_inproc_url =
366                format!("{}-{}", &socket_data.sync_inproc_url, plugin.get_id());
367
368            sync.connect(&plugin_sync_socket_inproc_url)
369                .map_err(|_e| {
370                    EngineError::PluginSyncSocketError(
371                        plugin.get_id(),
372                        socket_data.sync_socket_port,
373                    )
374                })
375                .unwrap();
376            debug!(
377                "plugin {} connected to sync socket at URL: {}.",
378                plugin.get_id(),
379                plugin_sync_socket_inproc_url
380            );
381
382            // connect to and send sync message on sync socket
383            let msg = "ready";
384            sync.send(msg, 0)
385                .expect("Could not send ready message on thread for plugin; crashing!");
386            debug!("plugin {} sent ready message.", plugin.get_id());
387
388            // TODO -- couldn't get this error handling to work...
389            // .map_err(|_e| EngineError::PluginSyncSendError(plugin.get_id()))?;
390
391            // blocking call to wait for reply from engine
392            let _msg = sync
393                .recv_msg(0)
394                .expect("plugin got error trying to receive sync reply; crashing!");
395
396            debug!(
397                "plugin {} received reply from ready message. Executing start function...",
398                plugin.get_id()
399            );
400
401            // now execute the actual plugin function
402            plugin
403                .start(pub_socket, sub_socket, external_socket)
404                .unwrap();
405        }).expect(&format!("Could not spawn new thread for external plugin ({},{})", plugin_name, plugin_id));
406
407        Ok(thread_handle)
408    }
409
410    /// this function synchronizes all plugins to handle plugins that might start up more slowly than others.
411    /// it utilizes a set of REQ-REP zmq sockets -- one for each plugin.
412    /// the basic algorithm is:
413    ///   1) wait to receive ready messages from all plugins; it does this by doing a recv on each socket
414    ///   2) send an OK to all plugins
415    /// note: that we must use different sync sockets since the zmq REQ-REP socket only allows for the receipt of
416    /// one message before sending a reply and we must recieve 'ready' messages from all plugins before replying.
417    /// note: this function currently DOES NOT sync external plugins. this is left as a TODO.
418    fn sync_plugins(&self, context: &zmq::Context) -> Result<(), EngineError> {
419        let socket_data = SocketData::default();
420        // set of all sync sockets engine will use
421        let mut sync_sockets = Vec::<zmq::Socket>::new();
422
423        // iterate through each plugin, creating a sync socket for it using its id, and waiting
424        // for a ready message. all we need for this is the plugin_id, so we first build a vector of all
425        // plugin_id's, internal and external.
426        let mut plugin_ids: Vec<Uuid> = self.plugins.iter().map(|x| x.get_id()).collect();
427        let mut external_plugin_ids: Vec<Uuid> =
428            self.external_plugins.iter().map(|x| x.get_id()).collect();
429        plugin_ids.append(&mut external_plugin_ids);
430        debug!("Engine will now sync these plugins: {:?}", plugin_ids);
431
432        for plugin_id in &plugin_ids {
433            let sync_socket = context
434                .socket(zmq::REP)
435                .map_err(|_e| EngineError::EngineSyncSocketCreateError())?;
436
437            // bind sync socket to inproc URL
438            let plugin_sync_socket_inproc_url =
439                format!("{}-{}", &socket_data.sync_inproc_url, plugin_id);
440            debug!(
441                "Engine binding to sync inproc URL: {}",
442                &plugin_sync_socket_inproc_url
443            );
444            sync_socket
445                .bind(&plugin_sync_socket_inproc_url)
446                .map_err(|_e| {
447                    EngineError::EngineSyncSocketInprocBindError(plugin_sync_socket_inproc_url)
448                })?;
449            // receive ready message from plugin
450            let _msg = sync_socket
451                .recv_msg(0)
452                .map_err(EngineError::EngineSyncSocketMsgRcvError)?;
453            debug!("Engine received a ready message from plugin {}", plugin_id);
454            sync_sockets.push(sync_socket);
455        }
456
457        debug!("Engine received all ready messages; now sending replies.");
458
459        // send a reply to all plugins
460        let mut msg_sent = 0;
461        while msg_sent < plugin_ids.len() {
462            let reply = "ok";
463            let sync_socket = sync_sockets
464                .pop()
465                .ok_or(EngineError::EngineSyncSocketPopError())?;
466            sync_socket
467                .send(reply, 0)
468                .map_err(EngineError::EngineSyncSocketSendRcvError)?;
469            msg_sent += 1;
470            debug!("Engine sent a reply");
471        }
472        debug!("All plugins have been synced");
473
474        Ok(())
475    }
476
477    fn start_plugins(&self) -> Result<Vec<JoinHandle<()>>, EngineError> {
478        // call start_plugin with the zmq context and the config for each plugin,
479        // as defined in the PLUGINS constant
480        let mut thread_handles = vec![];
481        for plugin in &self.plugins {
482            let p = Arc::clone(plugin);
483            thread_handles.push(self.start_plugin(&self.context, p)?);
484        }
485
486        Ok(thread_handles)
487    }
488
489    fn start_external_plugins(&self) -> Result<Vec<JoinHandle<()>>, EngineError> {
490        // call start_plugin with the zmq context and the config for each plugin,
491        // as defined in the PLUGINS constant
492        let mut thread_handles = vec![];
493        for plugin in &self.external_plugins {
494            let p = Arc::clone(plugin);
495            thread_handles.push(self.start_external_plugin(&self.context, p)?);
496        }
497        Ok(thread_handles)
498    }
499
500    /// Add a new plugin to the app
501    pub fn register_plugin(mut self, plugin: Arc<Box<dyn Plugin>>) -> Self {
502        self.plugins.push(plugin);
503        self
504    }
505
506    /// Add zero or more new plugins to the app (non-chainable)
507    pub fn register_plugins(&mut self, plugins: Vec<Arc<Box<dyn Plugin>>>){
508        for plugin in plugins {
509            self.plugins.push(plugin);
510        }
511    }
512
513    /// Add a new external plugin to the app
514    pub fn register_external_plugin(mut self, plugin: Arc<Box<dyn ExternalPlugin>>) -> Self {
515        self.external_plugins.push(plugin);
516        self
517    }
518
519    /// Add zero or more new external plugins to the app (non-chainable)
520    pub fn register_external_plugins(&mut self, plugins: Vec<Arc<Box<dyn ExternalPlugin>>>) {
521        for plugin in plugins {
522            self.external_plugins.push(plugin);
523        }
524    }
525
526    pub fn run(self) -> Result<(), EngineError> {
527        info!("Engine starting application with {} plugins and {} external plugins on publish port: {} and subscribe port: {}.", self.plugins.len(), self.external_plugins.len(), self.app_config.publish_port, self.app_config.subscribe_port);
528        // incoming and outgoing sockets for the engine
529        let outgoing = self.get_outgoing_socket()?;
530        let incoming = self.get_incoming_socket()?;
531
532        info!("Engine starting zmq proxy.");
533
534        let _proxy_thread = thread::spawn(move || {
535            let _result = zmq::proxy(&incoming, &outgoing)
536                .expect("Engine got error running proxy; socket was closed?");
537        });
538
539        debug!("Engine has started proxy thread. Will now start and sync plugins");
540
541        // start plugins in their own thread
542        let plugin_thread_handles = self.start_plugins()?;
543
544        // start external plugins
545        let external_plugin_thread_handles = self.start_external_plugins()?;
546
547        // sync all plugins (internal and external)
548        self.sync_plugins(&self.context)?;
549
550        info!("All plugins started and synced; Will now wait for plugins to exit...");
551
552        // join all of the plugin threads, and when they are all complete, we can kill the entire
553        // program
554        for h in plugin_thread_handles {
555            h.join().unwrap();
556        }
557
558        info!(
559            "Engine joined all internal plugin threads; will now join external plugin threads."
560        );
561        for h in external_plugin_thread_handles {
562            h.join().unwrap();
563        }
564        // join all external plugins as well; if we don't there are still race conditions because external
565        // plugins could be slow to start and the main program could quit before they have received messages
566
567        info!("Engine joined all plugin threads.. ready to shut down.");
568        // all plugins have exited so let's kill the proxy thread now
569        // TODO -- is there a way to shut down the proxy thread using the handle? if we just exit
570        // here without terminating it, is that ok?
571
572        Ok(())
573    }
574}
575
576#[cfg(test)]
577mod tests {
578
579    use std::{str, sync::Arc, vec};
580
581    use log::{info, debug};
582
583    use zmq::Socket;
584
585    use crate::{
586        events::{Event, EventType},
587        plugins::Plugin,
588        App,
589    };
590
591    // Here we provide two simple, example event types. TypeA, which has a single string field,
592    // and TypeB which has a single integer field.
593    struct TypeAEventType {}
594    impl EventType for TypeAEventType {
595        fn get_name(&self) -> String {
596            let s = "TypeA";
597            s.to_string()
598        }
599
600        fn get_filter(&self) -> Result<Vec<u8>, crate::errors::EngineError> {
601            // just return the bytes associated with the name.
602            Ok(self.get_name().as_bytes().to_vec())
603        }
604    }
605
606    // Example event for event type TypeA
607    struct TypeAEvent {
608        message: String,
609    }
610
611    impl Event for TypeAEvent {
612        fn to_bytes(&self) -> Result<Vec<u8>, crate::errors::EngineError> {
613            let type_a = TypeAEventType {};
614            // The byte array begins with the filter and then adds the message
615            let result = [
616                type_a.get_filter().unwrap(),
617                self.message.as_bytes().to_vec(),
618            ]
619            .concat();
620            Ok(result)
621        }
622
623        fn from_bytes(mut b: Vec<u8>) -> Result<TypeAEvent, Box<(dyn std::error::Error + 'static)>> {
624            // remove the first 5 bytes which are the message type
625            for _i in 1..5 {
626                b.remove(0);
627            }
628            let msg = str::from_utf8(&b).unwrap();
629            Ok(TypeAEvent {
630                message: msg.to_string(),
631            })
632        }
633    }
634
635    // The second event type, TypeB.
636    struct TypeBEventType {}
637    impl EventType for TypeBEventType {
638        fn get_name(&self) -> String {
639            let s = "TypeB";
640            s.to_string()
641        }
642
643        fn get_filter(&self) -> Result<Vec<u8>, crate::errors::EngineError> {
644            // just return the bytes associated with the name.
645            Ok(self.get_name().as_bytes().to_vec())
646        }
647    }
648
649    // Event for event type TypeB
650    struct TypeBEvent {
651        count: usize,
652    }
653
654    impl Event for TypeBEvent {
655        fn to_bytes(&self) -> Result<Vec<u8>, crate::errors::EngineError> {
656            // this is a TypeB event
657            let type_b = TypeBEventType {};
658            let message = format!("{}", self.count);
659            // The byte array begins with the filter and then adds the message
660            let result = [type_b.get_filter().unwrap(), message.as_bytes().to_vec()].concat();
661            Ok(result)
662        }
663
664        fn from_bytes(mut b: Vec<u8>) -> Result<TypeBEvent, Box<(dyn std::error::Error + 'static)>> {
665            // remove the first 5 bytes which are the message type
666            for _i in 0..5 {
667                b.remove(0);
668            }
669            let msg = str::from_utf8(&b).unwrap();
670            Ok(TypeBEvent {
671                count: msg.to_string().parse().unwrap(),
672            })
673        }
674    }
675
676    // Plugin examples.
677    // Example of a "message producer" plugin. This plugin produces 5 strings and sends them as TypeA events.
678    // It sends the 5 events as fast as possible.
679    // It also subscribes to TypeB events, which are sent by the "counter" pluging in response to TypeA evnts.
680    // After sending its 5 events, it then receives all of the TypeB events (there should b exactly 5).
681    struct MsgProducerPlugin {
682        id: uuid::Uuid,
683    }
684    impl MsgProducerPlugin {
685        fn new() -> Self {
686            MsgProducerPlugin {
687                id: uuid::Uuid::new_v4(),
688            }
689        }
690    }
691    impl Plugin for MsgProducerPlugin {
692        fn get_name(&self) -> String {
693            "MsgProducerPlugin".to_string()
694        }
695        fn start(
696            &self,
697            pub_socket: Socket,
698            sub_socket: Socket,
699        ) -> Result<(), crate::errors::EngineError> {
700            info!(
701                "MsgProducer (plugin id {}) start function starting...",
702                self.get_id()
703            );
704
705            // send 5 messages
706            let mut total_messages_sent = 0;
707            while total_messages_sent < 5 {
708                let message = format!("This is message {}", total_messages_sent);
709                let m = TypeAEvent { message };
710                let data = m.to_bytes().unwrap();
711                debug!("MsgProducer sending bytes: {:?}", data);
712                pub_socket.send(data, 0).unwrap();
713                total_messages_sent += 1;
714                debug!(
715                    "MsgProducer sent TypeA event message: {}",
716                    total_messages_sent
717                );
718            }
719            info!("MsgProducer has sent all TypeA event messages, now waiting to receive TypeB events");
720
721            // now get the TypeB events
722            let mut total_messages_read = 0;
723            while total_messages_read < 5 {
724                // get the bytes of a new message; it should be of TypeB
725                let b = sub_socket.recv_bytes(0).unwrap();
726                debug!("MsgProducer received TypeB message; bytes: {:?}", b);
727                let event_msg = TypeBEvent::from_bytes(b).unwrap();
728                let count = event_msg.count;
729                debug!("Got a type B message; count was: {}", count);
730                total_messages_read += 1;
731                debug!(
732                    "MsgProducer received TypeB event message: {}",
733                    total_messages_read
734                );
735            }
736            info!("MsgProducer has received all TypeB event messages; now exiting.");
737
738            Ok(())
739        }
740
741        fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, crate::errors::EngineError> {
742            Ok(vec![Box::new(TypeBEventType {})])
743        }
744
745        fn get_id(&self) -> uuid::Uuid {
746            self.id
747        }
748    }
749
750    // Example of a "counter" plugin. This plugin subscribes to TypeA events and computes the character count in the message. It then
751    // produces a typeB event with the character count it computed.
752
753    struct CounterPlugin {
754        id: uuid::Uuid,
755    }
756    impl CounterPlugin {
757        fn new() -> Self {
758            CounterPlugin {
759                id: uuid::Uuid::new_v4(),
760            }
761        }
762    }
763    impl Plugin for CounterPlugin {
764        fn get_name(&self) -> String {
765            "CounterPlugin".to_string()
766        }
767
768        fn start(
769            &self,
770            pub_socket: Socket,
771            sub_socket: Socket,
772        ) -> Result<(), crate::errors::EngineError> {
773            info!(
774                "Counter (plugin id {}) start function starting...",
775                self.get_id()
776            );
777            // compute the counts of the first 5 messages
778            let mut total_messages_read = 0;
779            while total_messages_read < 5 {
780                // get the bytes of a new message; it should be of TypeA
781                let b = sub_socket.recv_bytes(0).unwrap();
782                let event_msg = TypeAEvent::from_bytes(b).unwrap();
783                let count = event_msg.message.len();
784                total_messages_read += 1;
785                debug!(
786                    "Counter plugin received TypeA message: {}",
787                    total_messages_read
788                );
789                // send a TypeB event
790                let m = TypeBEvent { count };
791                let data = m.to_bytes().unwrap();
792                pub_socket.send(data, 0).unwrap();
793                debug!("Counter plugin sent TypeB message: {}", total_messages_read);
794            }
795            info!("Counter plugin has sent all TypeB messages; now exiting.");
796
797            Ok(())
798        }
799
800        fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, crate::errors::EngineError> {
801            Ok(vec![Box::new(TypeAEventType {})])
802        }
803
804        fn get_id(&self) -> uuid::Uuid {
805            self.id
806        }
807    }
808
809    // basic test that we can register plugins and execute app.run() and that the program terminates.
810    #[test]
811    fn test_run_app() -> Result<(), String> {
812        // the plugins for our app
813        info!("Top of the test_run_app");
814        let msg_producer = MsgProducerPlugin::new();
815        let counter = CounterPlugin::new();
816        info!("plugins for test_run_app configured");
817        let app: App = App::new(5559, 5560);
818        app.register_plugin(Arc::new(Box::new(msg_producer)))
819            .register_plugin(Arc::new(Box::new(counter)))
820            // .register_external_plugin(publish_port, subscribe_port)
821            .run()
822            .map_err(|e| format!("Got error from Engine! Details: {}", e))?;
823        info!("returned from test_run_app.run()");
824        Ok(())
825    }
826
827    // ----- event filters -----
828    // test that plugin gets the messages that it subscribed to
829
830    // test that plugin does NOT get messages it did not subscribe to
831
832    // ----- event to_bytes() and from_bytes() -----
833    // test that a message received is byte-for-byte identical to the message that was sent
834}