1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use crate::{errors::EngineError, events::EventType};
use uuid::Uuid;
use zmq::Socket;

/// Public API for defining the "plugins" or independent components of an application.
/// A plugin is an application component that defines its own event subscriptions and runs
/// as a standalone thread. Both "internal" plugins and "external" plugins are supported.

/// API for internal plugins; i.e., plugins written in Rust and running in the main application 
/// process.
pub trait Plugin: Sync + Send {
    /// the entry point for the plugin. the engine will start the plugin in its own
    /// thread and execute this function.
    /// the pub_socket is used by the plugin to publish new events.
    /// the sub_socket is used by the plugin to get events published by other plugins.
    fn start(&self, pub_socket: Socket, sub_socket: Socket) -> Result<(), EngineError>;

    /// Return the event subscriptions, as a vector of strings, that this plugin is interested
    /// in.
    fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, EngineError>;

    /// Returns the unique id for this plugin
    fn get_id(&self) -> Uuid;
}


/// API for external plugins. 
/// External plugins are plugins that will run in a separate process from the main application process.
/// In particular, all plugins written in languages other than Rust will be external plugins. These external plugins 
/// still need to be registered on the main application. 
pub trait ExternalPlugin: Send + Sync {

    /// A Rust start function for external plugins. This function provides an API for the external plugin process
    /// to retrieve events and publish new events. It also supports commands, such as "quit" which will allow it
    /// to shut down.
    /// This API is exposed on a zmq REP socket.
    fn start(&self, pub_socket: Socket, sub_socket: Socket, external_socket: Socket) -> Result<(), EngineError> {
        let plugin_id = self.get_id();
        println!("{}: top of start function for external plugin", plugin_id);
        loop {
            println!("{}: waiting for next message", plugin_id);
            // get the next message
            let msg = external_socket.recv_bytes(0)
                .map_err(|e| EngineError::EngineExtSocketRcvError(plugin_id, e))?;
            println!("{}: got message from external plugin", plugin_id);

            if msg.is_ascii() {
                let msg_str_result = std::str::from_utf8(&msg);
                let msg = match msg_str_result {
                    Ok(m) => m,
                    Err(e) => {
                        // don't let a bad command crash everything, just log the error and continue
                        println!("{}: Got unexpected message from plugin: could not encode to utf8; error: {} message bytes: {:?}", plugin_id, e, msg);
                        // always need to reply
                        external_socket.send("event-engine: bad msg", 0)
                            .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;
                        continue;
                    }
                };
                if msg == "plugin_command: quit".to_string() {
                    println!("{}: got quit message; exiting", plugin_id);
                    break;
                }
                if msg == "plugin_command: next_msg".to_string() {
                    println!("{}: got next_msg command, retrieving next message", plugin_id);
                    // get the next message and reply with it over the external socket
                    let next_msg = sub_socket.recv_bytes(0)
                        .map_err(|e| EngineError::EngineExtSubSocketRcvError(plugin_id, e))?;
                    println!("{}: got next message, sengding over external socket", plugin_id);
                    external_socket.send(next_msg, 0)
                        .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;
                    println!("{}: external message sent, loop complete", plugin_id);
                    continue;
                }
            }
            // if we are here, we have not processed the message, so we assume it is a new event.
            // we publish it on the pub socket and then reply
            pub_socket.send(msg, 0)
                .map_err(|e| EngineError::EngineExtPubSocketSendError(plugin_id, e))?;
            external_socket.send("event-engine: msg published", 0)
                .map_err(|e| EngineError::EngineExtSocketSendError(plugin_id, e))?;

        }

        Ok(())
    }

    /// the tcp port on which the external plugin will retrieve and publish events.
    fn get_tcp_port(&self) -> i32;

    /// Return the event subscriptions, as a vector of strings, that this plugin is interested
    /// in.
    fn get_subscriptions(&self) -> Result<Vec<Box<dyn EventType>>, EngineError>;

    /// Returns the unique id for this plugin
    fn get_id(&self) -> Uuid;

}