event_engine/
plugins.rs

1//! Plugins are independent components of the application that create and consume events. Plugins can be either
2//! internal, meaning they are written in Rust and run within the main application process as a child thread, or
3//! external, meaning they run in a separate process from the main application. Internal plugins use an inprocess
4//! communication mechanism to publish ans subscribe to events, while external plugins communicate with the main
5//! application over TCP.
6//!
7//! The following traits provide the contracts for writing internal ans external plugins.
8use crate::{errors::EngineError, events::EventType};
9use log::{debug, info, error};
10use uuid::Uuid;
11use zmq::Socket;
12
13/// Public API for defining the internal plugins. Internal plugns are plugins that run within the main application
14/// process in a child thread. In particular, all internal plugins must be written in Rust.
15pub trait Plugin: Sync + Send {
16    /// The entry point for the plugin. The engine will start the plugin in its own
17    /// thread and execute this function.
18    /// the pub_socket is used by the plugin to publish new events.
19    /// the sub_socket is used by the plugin to get events published by other plugins.
20    fn start(&self, pub_socket: Socket, sub_socket: Socket) -> Result<(), EngineError>;
21
22    /// Return the event subscriptions, as a vector of strings, that this plugin is interested
23    /// in.
24    fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, EngineError>;
25
26    /// Returns the unique id for this plugin
27    fn get_id(&self) -> Uuid;
28
29    /// Returns the name for the plugin
30    fn get_name(&self) -> String;
31}
32
33/// Public API for defining external plugins. External plugins are plugins that will run in a separate process from the main
34/// application process.
35/// In particular, all plugins written in languages other than Rust will be external plugins. These external plugins
36/// still need to be registered on the main application.
37pub trait ExternalPlugin: Send + Sync {
38    /// A Rust start function for external plugins. This function provides an API for the external plugin process
39    /// to retrieve events and publish new events. It also supports commands, such as "quit" which will allow it
40    /// to shut down.
41    /// This API is exposed on a zmq REP socket.
42    fn start(
43        &self,
44        pub_socket: Socket,
45        sub_socket: Socket,
46        external_socket: Socket,
47    ) -> Result<(), EngineError> {
48        let plugin_id = self.get_id();
49        debug!("{}: top of start function for external plugin", plugin_id);
50        loop {
51            debug!("{}: waiting for next message", plugin_id);
52            // get the next message
53            let msg = external_socket
54                .recv_bytes(0)
55                .map_err(|e| EngineError::EngineExtSocketRcvError(plugin_id, e))?;
56            debug!("{}: got message from external plugin", plugin_id);
57
58            if msg.is_ascii() {
59                let msg_str_result = std::str::from_utf8(&msg);
60                let msg = match msg_str_result {
61                    Ok(m) => m,
62                    Err(e) => {
63                        // don't let a bad command crash everything, just log the error and continue
64                        error!("{}: Got unexpected message from plugin: could not encode to utf8; error: {} message bytes: {:?}", plugin_id, e, msg);
65                        // always need to reply
66                        external_socket
67                            .send("event-engine: bad msg", 0)
68                            .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;
69                        continue;
70                    }
71                };
72                if msg == "plugin_command: quit" {
73                    info!("{}: got quit message; exiting", plugin_id);
74                    break;
75                }
76                if msg == "plugin_command: next_msg" {
77                    debug!(
78                        "{}: got next_msg command, retrieving next message",
79                        plugin_id
80                    );
81                    // get the next message and reply with it over the external socket
82                    let next_msg = sub_socket
83                        .recv_bytes(0)
84                        .map_err(|e| EngineError::EngineExtSubSocketRcvError(plugin_id, e))?;
85                    debug!(
86                        "{}: got next message, sengding over external socket",
87                        plugin_id
88                    );
89                    external_socket
90                        .send(next_msg, 0)
91                        .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;
92                    debug!("{}: external message sent, loop complete", plugin_id);
93                    continue;
94                }
95            }
96            // if we are here, we have not processed the message, so we assume it is a new event.
97            // we publish it on the pub socket and then reply
98            pub_socket
99                .send(msg, 0)
100                .map_err(|e| EngineError::EngineExtPubSocketSendError(plugin_id, e))?;
101            external_socket
102                .send("event-engine: msg published", 0)
103                .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;
104        }
105
106        Ok(())
107    }
108
109    /// the tcp port on which the external plugin will retrieve and publish events.
110    fn get_tcp_port(&self) -> i32;
111
112    /// Return the event subscriptions, as a vector of strings, that this plugin is interested
113    /// in.
114    fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, EngineError>;
115
116    /// Returns the unique id for this plugin
117    fn get_id(&self) -> Uuid;
118
119    /// Returns the name for the plugin
120    fn get_name(&self) -> String;
121
122}