1use std::{
18 sync::Arc,
19 thread::{self, JoinHandle},
20};
21
22use errors::EngineError;
23use log::{info, debug};
24use plugins::{ExternalPlugin, Plugin};
25use uuid::Uuid;
26use zmq::{Context, Socket};
27
28pub mod errors;
29pub mod events;
30pub mod plugins;
31
32pub struct AppConfig {
34 pub publish_port: i32,
35 pub subscribe_port: i32,
36}
37
38struct SocketData {
40 pub_socket_inproc_url: String,
41 sub_socket_inproc_url: String,
42 sync_socket_port: i32,
43 sync_inproc_url: String,
44}
45
46impl Default for SocketData {
47 fn default() -> Self {
48 Self {
49 pub_socket_inproc_url: "inproc://messages".to_string(),
50 sub_socket_inproc_url: "inproc://events".to_string(),
51 sync_socket_port: 5000,
52 sync_inproc_url: "inproc://sync".to_string(),
53 }
54 }
55}
56
57pub struct App {
59 pub plugins: Vec<Arc<Box<dyn Plugin>>>,
60 pub external_plugins: Vec<Arc<Box<dyn ExternalPlugin>>>,
61 pub app_config: AppConfig,
62 pub context: Context,
63}
64
65impl Default for App {
66 fn default() -> Self {
67 App {
68 plugins: vec![],
69 external_plugins: vec![],
70 app_config: AppConfig {
71 publish_port: 5559,
72 subscribe_port: 5560,
73 },
74 context: Context::new(),
75 }
76 }
77}
78
79impl App {
80 pub fn new(publish_port: i32, subscribe_port: i32) -> Self {
81 let app_config = AppConfig {
82 publish_port,
83 subscribe_port,
84 };
85 App {
86 app_config,
87 ..Default::default()
88 }
89 }
90
91 fn get_outgoing_socket(&self) -> Result<Socket, EngineError> {
94 let socket_data = SocketData::default();
95 let sub_socket_tcp_url = format!("tcp://*:{}", self.app_config.subscribe_port);
96 let outgoing = self.context.socket(zmq::PUB)?;
98 outgoing.bind(&sub_socket_tcp_url).map_err(|e| {
100 EngineError::SubSocketTCPBindError(self.app_config.subscribe_port.to_string(), e)
101 })?;
102 outgoing
104 .bind(&socket_data.sub_socket_inproc_url)
105 .map_err(|_e| {
106 EngineError::SubSocketInProcBindError(socket_data.sub_socket_inproc_url.to_string())
107 })?;
108 Ok(outgoing)
109 }
110
111 fn get_incoming_socket(&self) -> Result<Socket, EngineError> {
113 let socket_data = SocketData::default();
114 let pub_socket_tcp_url = format!("tcp://*:{}", self.app_config.publish_port);
115 let incoming = self.context.socket(zmq::SUB)?;
116 incoming.bind(&pub_socket_tcp_url).map_err(|_e| {
118 EngineError::PubSocketTCPBindError(self.app_config.subscribe_port.to_string())
119 })?;
120 incoming
122 .bind(&socket_data.pub_socket_inproc_url)
123 .map_err(|_e| {
124 EngineError::PubSocketInProcBindError(socket_data.pub_socket_inproc_url.to_string())
125 })?;
126 let filter = String::new();
128 incoming
129 .set_subscribe(filter.as_bytes())
130 .map_err(EngineError::EngineSetSubFilterAllError)?;
131 Ok(incoming)
132 }
133
134 fn start_plugin(
143 &self,
144 context: &zmq::Context,
145 plugin: Arc<Box<dyn Plugin>>,
146 ) -> Result<JoinHandle<()>, EngineError> {
147 let socket_data = SocketData::default();
148
149 let context_clone = context.clone();
153 let plugin_name = plugin.get_name();
154 let plugin_id = plugin.get_id();
155 let builder = thread::Builder::new().name(plugin.get_name());
156 let thread_handle = builder.spawn(move || {
157 debug!("plugin ({}, {}) thread started.", plugin.get_name(), plugin.get_id());
158
159 let pub_socket = context_clone
167 .socket(zmq::PUB)
168 .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
169 .unwrap();
170 pub_socket
171 .connect(&socket_data.pub_socket_inproc_url)
172 .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
173 .unwrap();
174 debug!("plugin {} connected to pub socket.", plugin.get_id());
175
176 let sub_socket = context_clone
178 .socket(zmq::SUB)
179 .map_err(|_e| EngineError::PluginSubSocketError(plugin.get_id()))
180 .unwrap();
181 sub_socket
182 .connect(&socket_data.sub_socket_inproc_url)
183 .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
184 .unwrap();
185
186 for sub in plugin.get_subscriptions().unwrap() {
188 let filter = sub
189 .get_filter()
190 .map_err(|_e| EngineError::EngineSetSubFilterError())
191 .unwrap();
192 debug!(
193 "Engine setting subscription filter {:?} for plugin: {}",
194 filter,
195 plugin.get_id()
196 );
197 sub_socket
201 .set_subscribe(&filter)
202 .map_err(|_e| {
203 EngineError::PluginSubscriptionError(sub.get_name(), plugin.get_id())
204 })
205 .unwrap();
206 }
207 debug!(
208 "plugin {} connected to sub socket with subscriptions set.",
209 plugin.get_id()
210 );
211
212 let sync = context_clone
214 .socket(zmq::REQ)
215 .map_err(|_e| {
216 EngineError::PluginSyncSocketError(
217 plugin.get_id(),
218 socket_data.sync_socket_port,
219 )
220 })
221 .unwrap();
222 let plugin_sync_socket_inproc_url =
229 format!("{}-{}", &socket_data.sync_inproc_url, plugin.get_id());
230
231 sync.connect(&plugin_sync_socket_inproc_url)
232 .map_err(|_e| {
233 EngineError::PluginSyncSocketError(
234 plugin.get_id(),
235 socket_data.sync_socket_port,
236 )
237 })
238 .unwrap();
239 debug!(
240 "plugin {} connected to sync socket at URL: {}.",
241 plugin.get_id(),
242 plugin_sync_socket_inproc_url
243 );
244
245 let msg = "ready";
247 sync.send(msg, 0)
248 .expect("Could not send ready message on thread for plugin; crashing!");
249 debug!("plugin {} sent ready message.", plugin.get_id());
250
251 let _msg = sync
256 .recv_msg(0)
257 .expect("plugin got error trying to receive sync reply; crashing!");
258
259 debug!(
260 "plugin {} received reply from ready message. Executing start function...",
261 plugin.get_id()
262 );
263
264 plugin.start(pub_socket, sub_socket).unwrap();
266 }).expect(&format!("Could not spawn new thread for plugin ({},{})", plugin_name, plugin_id));
267 Ok(thread_handle)
268 }
269
270 fn start_external_plugin(
271 &self,
272 context: &zmq::Context,
273 plugin: Arc<Box<dyn ExternalPlugin>>,
274 ) -> Result<JoinHandle<()>, EngineError> {
275 let socket_data = SocketData::default();
276
277 let context_clone = context.clone();
281 let plugin_name = plugin.get_name();
282 let plugin_id = plugin.get_id();
283 let builder = thread::Builder::new().name(plugin.get_name());
284 let thread_handle = builder.spawn(move || {
285 debug!("external plugin ({}, {}) thread started.", plugin.get_name(), plugin.get_id());
286
287 let external_socket = context_clone
295 .socket(zmq::REP)
296 .map_err(|e| EngineError::PluginExternalSocketError(plugin.get_id(), e))
297 .unwrap();
298 let external_tcp_url = format!("tcp://*:{}", plugin.get_tcp_port());
299
300 external_socket
301 .bind(&external_tcp_url)
302 .map_err(|e| EngineError::PluginExternalSocketError(plugin.get_id(), e))
303 .unwrap();
304 debug!("plugin bound to external TCP socket: {}", external_tcp_url);
305
306 let pub_socket = context_clone
308 .socket(zmq::PUB)
309 .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
310 .unwrap();
311 pub_socket
312 .connect(&socket_data.pub_socket_inproc_url)
313 .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
314 .unwrap();
315 debug!("plugin {} connected to pub socket.", plugin.get_id());
316
317 let sub_socket = context_clone
319 .socket(zmq::SUB)
320 .map_err(|_e| EngineError::PluginSubSocketError(plugin.get_id()))
321 .unwrap();
322 sub_socket
323 .connect(&socket_data.sub_socket_inproc_url)
324 .map_err(|_e| EngineError::PluginPubSocketError(plugin.get_id()))
325 .unwrap();
326
327 for sub in plugin.get_subscriptions().unwrap() {
329 let filter = sub
330 .get_filter()
331 .map_err(|_e| EngineError::EngineSetSubFilterError())
332 .unwrap();
333 debug!("Engine setting subscription filter {:?}", filter);
334 sub_socket
338 .set_subscribe(&filter)
339 .map_err(|_e| {
340 EngineError::PluginSubscriptionError(sub.get_name(), plugin.get_id())
341 })
342 .unwrap();
343 }
344 debug!(
345 "external plugin {} connected to sub socket with subscriptions set.",
346 plugin.get_id()
347 );
348
349 let sync = context_clone
351 .socket(zmq::REQ)
352 .map_err(|_e| {
353 EngineError::PluginSyncSocketError(
354 plugin.get_id(),
355 socket_data.sync_socket_port,
356 )
357 })
358 .unwrap();
359 let plugin_sync_socket_inproc_url =
366 format!("{}-{}", &socket_data.sync_inproc_url, plugin.get_id());
367
368 sync.connect(&plugin_sync_socket_inproc_url)
369 .map_err(|_e| {
370 EngineError::PluginSyncSocketError(
371 plugin.get_id(),
372 socket_data.sync_socket_port,
373 )
374 })
375 .unwrap();
376 debug!(
377 "plugin {} connected to sync socket at URL: {}.",
378 plugin.get_id(),
379 plugin_sync_socket_inproc_url
380 );
381
382 let msg = "ready";
384 sync.send(msg, 0)
385 .expect("Could not send ready message on thread for plugin; crashing!");
386 debug!("plugin {} sent ready message.", plugin.get_id());
387
388 let _msg = sync
393 .recv_msg(0)
394 .expect("plugin got error trying to receive sync reply; crashing!");
395
396 debug!(
397 "plugin {} received reply from ready message. Executing start function...",
398 plugin.get_id()
399 );
400
401 plugin
403 .start(pub_socket, sub_socket, external_socket)
404 .unwrap();
405 }).expect(&format!("Could not spawn new thread for external plugin ({},{})", plugin_name, plugin_id));
406
407 Ok(thread_handle)
408 }
409
410 fn sync_plugins(&self, context: &zmq::Context) -> Result<(), EngineError> {
419 let socket_data = SocketData::default();
420 let mut sync_sockets = Vec::<zmq::Socket>::new();
422
423 let mut plugin_ids: Vec<Uuid> = self.plugins.iter().map(|x| x.get_id()).collect();
427 let mut external_plugin_ids: Vec<Uuid> =
428 self.external_plugins.iter().map(|x| x.get_id()).collect();
429 plugin_ids.append(&mut external_plugin_ids);
430 debug!("Engine will now sync these plugins: {:?}", plugin_ids);
431
432 for plugin_id in &plugin_ids {
433 let sync_socket = context
434 .socket(zmq::REP)
435 .map_err(|_e| EngineError::EngineSyncSocketCreateError())?;
436
437 let plugin_sync_socket_inproc_url =
439 format!("{}-{}", &socket_data.sync_inproc_url, plugin_id);
440 debug!(
441 "Engine binding to sync inproc URL: {}",
442 &plugin_sync_socket_inproc_url
443 );
444 sync_socket
445 .bind(&plugin_sync_socket_inproc_url)
446 .map_err(|_e| {
447 EngineError::EngineSyncSocketInprocBindError(plugin_sync_socket_inproc_url)
448 })?;
449 let _msg = sync_socket
451 .recv_msg(0)
452 .map_err(EngineError::EngineSyncSocketMsgRcvError)?;
453 debug!("Engine received a ready message from plugin {}", plugin_id);
454 sync_sockets.push(sync_socket);
455 }
456
457 debug!("Engine received all ready messages; now sending replies.");
458
459 let mut msg_sent = 0;
461 while msg_sent < plugin_ids.len() {
462 let reply = "ok";
463 let sync_socket = sync_sockets
464 .pop()
465 .ok_or(EngineError::EngineSyncSocketPopError())?;
466 sync_socket
467 .send(reply, 0)
468 .map_err(EngineError::EngineSyncSocketSendRcvError)?;
469 msg_sent += 1;
470 debug!("Engine sent a reply");
471 }
472 debug!("All plugins have been synced");
473
474 Ok(())
475 }
476
477 fn start_plugins(&self) -> Result<Vec<JoinHandle<()>>, EngineError> {
478 let mut thread_handles = vec![];
481 for plugin in &self.plugins {
482 let p = Arc::clone(plugin);
483 thread_handles.push(self.start_plugin(&self.context, p)?);
484 }
485
486 Ok(thread_handles)
487 }
488
489 fn start_external_plugins(&self) -> Result<Vec<JoinHandle<()>>, EngineError> {
490 let mut thread_handles = vec![];
493 for plugin in &self.external_plugins {
494 let p = Arc::clone(plugin);
495 thread_handles.push(self.start_external_plugin(&self.context, p)?);
496 }
497 Ok(thread_handles)
498 }
499
500 pub fn register_plugin(mut self, plugin: Arc<Box<dyn Plugin>>) -> Self {
502 self.plugins.push(plugin);
503 self
504 }
505
506 pub fn register_plugins(&mut self, plugins: Vec<Arc<Box<dyn Plugin>>>){
508 for plugin in plugins {
509 self.plugins.push(plugin);
510 }
511 }
512
513 pub fn register_external_plugin(mut self, plugin: Arc<Box<dyn ExternalPlugin>>) -> Self {
515 self.external_plugins.push(plugin);
516 self
517 }
518
519 pub fn register_external_plugins(&mut self, plugins: Vec<Arc<Box<dyn ExternalPlugin>>>) {
521 for plugin in plugins {
522 self.external_plugins.push(plugin);
523 }
524 }
525
526 pub fn run(self) -> Result<(), EngineError> {
527 info!("Engine starting application with {} plugins and {} external plugins on publish port: {} and subscribe port: {}.", self.plugins.len(), self.external_plugins.len(), self.app_config.publish_port, self.app_config.subscribe_port);
528 let outgoing = self.get_outgoing_socket()?;
530 let incoming = self.get_incoming_socket()?;
531
532 info!("Engine starting zmq proxy.");
533
534 let _proxy_thread = thread::spawn(move || {
535 let _result = zmq::proxy(&incoming, &outgoing)
536 .expect("Engine got error running proxy; socket was closed?");
537 });
538
539 debug!("Engine has started proxy thread. Will now start and sync plugins");
540
541 let plugin_thread_handles = self.start_plugins()?;
543
544 let external_plugin_thread_handles = self.start_external_plugins()?;
546
547 self.sync_plugins(&self.context)?;
549
550 info!("All plugins started and synced; Will now wait for plugins to exit...");
551
552 for h in plugin_thread_handles {
555 h.join().unwrap();
556 }
557
558 info!(
559 "Engine joined all internal plugin threads; will now join external plugin threads."
560 );
561 for h in external_plugin_thread_handles {
562 h.join().unwrap();
563 }
564 info!("Engine joined all plugin threads.. ready to shut down.");
568 Ok(())
573 }
574}
575
576#[cfg(test)]
577mod tests {
578
579 use std::{str, sync::Arc, vec};
580
581 use log::{info, debug};
582
583 use zmq::Socket;
584
585 use crate::{
586 events::{Event, EventType},
587 plugins::Plugin,
588 App,
589 };
590
591 struct TypeAEventType {}
594 impl EventType for TypeAEventType {
595 fn get_name(&self) -> String {
596 let s = "TypeA";
597 s.to_string()
598 }
599
600 fn get_filter(&self) -> Result<Vec<u8>, crate::errors::EngineError> {
601 Ok(self.get_name().as_bytes().to_vec())
603 }
604 }
605
606 struct TypeAEvent {
608 message: String,
609 }
610
611 impl Event for TypeAEvent {
612 fn to_bytes(&self) -> Result<Vec<u8>, crate::errors::EngineError> {
613 let type_a = TypeAEventType {};
614 let result = [
616 type_a.get_filter().unwrap(),
617 self.message.as_bytes().to_vec(),
618 ]
619 .concat();
620 Ok(result)
621 }
622
623 fn from_bytes(mut b: Vec<u8>) -> Result<TypeAEvent, Box<(dyn std::error::Error + 'static)>> {
624 for _i in 1..5 {
626 b.remove(0);
627 }
628 let msg = str::from_utf8(&b).unwrap();
629 Ok(TypeAEvent {
630 message: msg.to_string(),
631 })
632 }
633 }
634
635 struct TypeBEventType {}
637 impl EventType for TypeBEventType {
638 fn get_name(&self) -> String {
639 let s = "TypeB";
640 s.to_string()
641 }
642
643 fn get_filter(&self) -> Result<Vec<u8>, crate::errors::EngineError> {
644 Ok(self.get_name().as_bytes().to_vec())
646 }
647 }
648
649 struct TypeBEvent {
651 count: usize,
652 }
653
654 impl Event for TypeBEvent {
655 fn to_bytes(&self) -> Result<Vec<u8>, crate::errors::EngineError> {
656 let type_b = TypeBEventType {};
658 let message = format!("{}", self.count);
659 let result = [type_b.get_filter().unwrap(), message.as_bytes().to_vec()].concat();
661 Ok(result)
662 }
663
664 fn from_bytes(mut b: Vec<u8>) -> Result<TypeBEvent, Box<(dyn std::error::Error + 'static)>> {
665 for _i in 0..5 {
667 b.remove(0);
668 }
669 let msg = str::from_utf8(&b).unwrap();
670 Ok(TypeBEvent {
671 count: msg.to_string().parse().unwrap(),
672 })
673 }
674 }
675
676 struct MsgProducerPlugin {
682 id: uuid::Uuid,
683 }
684 impl MsgProducerPlugin {
685 fn new() -> Self {
686 MsgProducerPlugin {
687 id: uuid::Uuid::new_v4(),
688 }
689 }
690 }
691 impl Plugin for MsgProducerPlugin {
692 fn get_name(&self) -> String {
693 "MsgProducerPlugin".to_string()
694 }
695 fn start(
696 &self,
697 pub_socket: Socket,
698 sub_socket: Socket,
699 ) -> Result<(), crate::errors::EngineError> {
700 info!(
701 "MsgProducer (plugin id {}) start function starting...",
702 self.get_id()
703 );
704
705 let mut total_messages_sent = 0;
707 while total_messages_sent < 5 {
708 let message = format!("This is message {}", total_messages_sent);
709 let m = TypeAEvent { message };
710 let data = m.to_bytes().unwrap();
711 debug!("MsgProducer sending bytes: {:?}", data);
712 pub_socket.send(data, 0).unwrap();
713 total_messages_sent += 1;
714 debug!(
715 "MsgProducer sent TypeA event message: {}",
716 total_messages_sent
717 );
718 }
719 info!("MsgProducer has sent all TypeA event messages, now waiting to receive TypeB events");
720
721 let mut total_messages_read = 0;
723 while total_messages_read < 5 {
724 let b = sub_socket.recv_bytes(0).unwrap();
726 debug!("MsgProducer received TypeB message; bytes: {:?}", b);
727 let event_msg = TypeBEvent::from_bytes(b).unwrap();
728 let count = event_msg.count;
729 debug!("Got a type B message; count was: {}", count);
730 total_messages_read += 1;
731 debug!(
732 "MsgProducer received TypeB event message: {}",
733 total_messages_read
734 );
735 }
736 info!("MsgProducer has received all TypeB event messages; now exiting.");
737
738 Ok(())
739 }
740
741 fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, crate::errors::EngineError> {
742 Ok(vec![Box::new(TypeBEventType {})])
743 }
744
745 fn get_id(&self) -> uuid::Uuid {
746 self.id
747 }
748 }
749
750 struct CounterPlugin {
754 id: uuid::Uuid,
755 }
756 impl CounterPlugin {
757 fn new() -> Self {
758 CounterPlugin {
759 id: uuid::Uuid::new_v4(),
760 }
761 }
762 }
763 impl Plugin for CounterPlugin {
764 fn get_name(&self) -> String {
765 "CounterPlugin".to_string()
766 }
767
768 fn start(
769 &self,
770 pub_socket: Socket,
771 sub_socket: Socket,
772 ) -> Result<(), crate::errors::EngineError> {
773 info!(
774 "Counter (plugin id {}) start function starting...",
775 self.get_id()
776 );
777 let mut total_messages_read = 0;
779 while total_messages_read < 5 {
780 let b = sub_socket.recv_bytes(0).unwrap();
782 let event_msg = TypeAEvent::from_bytes(b).unwrap();
783 let count = event_msg.message.len();
784 total_messages_read += 1;
785 debug!(
786 "Counter plugin received TypeA message: {}",
787 total_messages_read
788 );
789 let m = TypeBEvent { count };
791 let data = m.to_bytes().unwrap();
792 pub_socket.send(data, 0).unwrap();
793 debug!("Counter plugin sent TypeB message: {}", total_messages_read);
794 }
795 info!("Counter plugin has sent all TypeB messages; now exiting.");
796
797 Ok(())
798 }
799
800 fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, crate::errors::EngineError> {
801 Ok(vec![Box::new(TypeAEventType {})])
802 }
803
804 fn get_id(&self) -> uuid::Uuid {
805 self.id
806 }
807 }
808
809 #[test]
811 fn test_run_app() -> Result<(), String> {
812 info!("Top of the test_run_app");
814 let msg_producer = MsgProducerPlugin::new();
815 let counter = CounterPlugin::new();
816 info!("plugins for test_run_app configured");
817 let app: App = App::new(5559, 5560);
818 app.register_plugin(Arc::new(Box::new(msg_producer)))
819 .register_plugin(Arc::new(Box::new(counter)))
820 .run()
822 .map_err(|e| format!("Got error from Engine! Details: {}", e))?;
823 info!("returned from test_run_app.run()");
824 Ok(())
825 }
826
827 }