event_engine/plugins.rs
1//! Plugins are independent components of the application that create and consume events. Plugins can be either
2//! internal, meaning they are written in Rust and run within the main application process as a child thread, or
3//! external, meaning they run in a separate process from the main application. Internal plugins use an inprocess
4//! communication mechanism to publish ans subscribe to events, while external plugins communicate with the main
5//! application over TCP.
6//!
7//! The following traits provide the contracts for writing internal ans external plugins.
8use crate::{errors::EngineError, events::EventType};
9use log::{debug, info, error};
10use uuid::Uuid;
11use zmq::Socket;
12
13/// Public API for defining the internal plugins. Internal plugns are plugins that run within the main application
14/// process in a child thread. In particular, all internal plugins must be written in Rust.
15pub trait Plugin: Sync + Send {
16 /// The entry point for the plugin. The engine will start the plugin in its own
17 /// thread and execute this function.
18 /// the pub_socket is used by the plugin to publish new events.
19 /// the sub_socket is used by the plugin to get events published by other plugins.
20 fn start(&self, pub_socket: Socket, sub_socket: Socket) -> Result<(), EngineError>;
21
22 /// Return the event subscriptions, as a vector of strings, that this plugin is interested
23 /// in.
24 fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, EngineError>;
25
26 /// Returns the unique id for this plugin
27 fn get_id(&self) -> Uuid;
28
29 /// Returns the name for the plugin
30 fn get_name(&self) -> String;
31}
32
33/// Public API for defining external plugins. External plugins are plugins that will run in a separate process from the main
34/// application process.
35/// In particular, all plugins written in languages other than Rust will be external plugins. These external plugins
36/// still need to be registered on the main application.
37pub trait ExternalPlugin: Send + Sync {
38 /// A Rust start function for external plugins. This function provides an API for the external plugin process
39 /// to retrieve events and publish new events. It also supports commands, such as "quit" which will allow it
40 /// to shut down.
41 /// This API is exposed on a zmq REP socket.
42 fn start(
43 &self,
44 pub_socket: Socket,
45 sub_socket: Socket,
46 external_socket: Socket,
47 ) -> Result<(), EngineError> {
48 let plugin_id = self.get_id();
49 debug!("{}: top of start function for external plugin", plugin_id);
50 loop {
51 debug!("{}: waiting for next message", plugin_id);
52 // get the next message
53 let msg = external_socket
54 .recv_bytes(0)
55 .map_err(|e| EngineError::EngineExtSocketRcvError(plugin_id, e))?;
56 debug!("{}: got message from external plugin", plugin_id);
57
58 if msg.is_ascii() {
59 let msg_str_result = std::str::from_utf8(&msg);
60 let msg = match msg_str_result {
61 Ok(m) => m,
62 Err(e) => {
63 // don't let a bad command crash everything, just log the error and continue
64 error!("{}: Got unexpected message from plugin: could not encode to utf8; error: {} message bytes: {:?}", plugin_id, e, msg);
65 // always need to reply
66 external_socket
67 .send("event-engine: bad msg", 0)
68 .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;
69 continue;
70 }
71 };
72 if msg == "plugin_command: quit" {
73 info!("{}: got quit message; exiting", plugin_id);
74 break;
75 }
76 if msg == "plugin_command: next_msg" {
77 debug!(
78 "{}: got next_msg command, retrieving next message",
79 plugin_id
80 );
81 // get the next message and reply with it over the external socket
82 let next_msg = sub_socket
83 .recv_bytes(0)
84 .map_err(|e| EngineError::EngineExtSubSocketRcvError(plugin_id, e))?;
85 debug!(
86 "{}: got next message, sengding over external socket",
87 plugin_id
88 );
89 external_socket
90 .send(next_msg, 0)
91 .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;
92 debug!("{}: external message sent, loop complete", plugin_id);
93 continue;
94 }
95 }
96 // if we are here, we have not processed the message, so we assume it is a new event.
97 // we publish it on the pub socket and then reply
98 pub_socket
99 .send(msg, 0)
100 .map_err(|e| EngineError::EngineExtPubSocketSendError(plugin_id, e))?;
101 external_socket
102 .send("event-engine: msg published", 0)
103 .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;
104 }
105
106 Ok(())
107 }
108
109 /// the tcp port on which the external plugin will retrieve and publish events.
110 fn get_tcp_port(&self) -> i32;
111
112 /// Return the event subscriptions, as a vector of strings, that this plugin is interested
113 /// in.
114 fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, EngineError>;
115
116 /// Returns the unique id for this plugin
117 fn get_id(&self) -> Uuid;
118
119 /// Returns the name for the plugin
120 fn get_name(&self) -> String;
121
122}