Skip to main content

reaction_plugin/
lib.rs

1//! This crate defines the API between reaction's core and plugins.
2//!
3//! Plugins must be written in Rust, for now.
4//!
5//! This documentation assumes the reader has some knowledge of Rust.
6//! However, if you find that something is unclear, don't hesitate to
7//! [ask for help](https://framagit.org/ppom/reaction/#help), even if you're new to Rust.
8//!
9//! To implement a plugin, one has to provide an implementation of [`PluginInfo`], that provides
10//! the entrypoint for a plugin.
11//! It permits to define `0` to `n` custom stream and action types.
12//!
13//! ## Note on reaction-plugin API stability
14//!
15//! This is the v1 of reaction's plugin interface.
16//! It's quite efficient and complete, but it has the big drawback of being Rust-only and [`tokio`]-only.
17//!
18//! In the future, I'd like to define a language-agnostic interface, which will be a major breaking change in the API.
19//! However, I'll try my best to reduce the necessary code changes for plugins that use this v1.
20//!
21//! ## Naming & calling conventions
22//!
23//! Your plugin should be named `reaction-plugin-$NAME`, eg. `reaction-plugin-postgresql`.
24//! It will be invoked with one positional argument "serve".
25//! ```bash
26//! reaction-plugin-$NAME serve
27//! ```
28//! This can be useful if you want to provide CLI functionnality to your users,
29//! so you can distinguish between a human user and reaction.
30//!
31//! ### State directory
32//!
33//! It will be executed in its own directory, in which it should have write access.
34//! The directory is `$reaction_state_directory/plugin_data/$NAME`.
35//! reaction's [state_directory](https://reaction.ppom.me/reference.html#state_directory)
36//! defaults to its working directory, which is `/var/lib/reaction` in most setups.
37//!
38//! So your plugin directory should most often be `/var/lib/reaction/plugin_data/$NAME`,
39//! but the plugin shouldn't expect that and use the current working directory instead.
40//!
41//! ## Communication
42//!
43//! Communication between the plugin and reaction is based on [`remoc`], which permits to multiplex channels and remote objects/functions/trait
44//! calls over a single transport channel.
45//! The channels read and write channels are stdin and stdout, so you shouldn't use them for something else.
46//!
47//! [`remoc`] builds upon [`tokio`], so you'll need to use tokio too.
48//!
49//! ### Errors
50//!
51//! Errors during:
52//! - config loading in [`PluginInfo::load_config`]
53//! - startup in [`PluginInfo::start`]
54//!
55//! should be returned to reaction by the function's return value, permitting reaction to abort startup.
56//!
57//! During normal runtime, after the plugin has loaded its config and started, and before reaction is quitting, there is no *rusty* way to send errors to reaction.
58//! Then errors can be printed to stderr.
59//! They'll be captured line by line and re-printed by reaction, with the plugin name prepended.
60//!
61//! A line can start with `DEBUG `, `INFO `, `WARN `, `ERROR `.
62//! If it starts with none of the above, the line is assumed to be an error.
63//!
64//! Example:
65//! Those lines:
66//! ```log
67//! WARN This is an official warning from the plugin
68//! Freeeee errrooooorrr
69//! ```
70//! Will become:
71//! ```log
72//! WARN plugin test: This is an official warning from the plugin
73//! ERROR plugin test: Freeeee errrooooorrr
74//! ```
75//!
76//! Plugins should not exit when there is an error: reaction quits only when told to do so,
77//! or if all its streams exit, and won't retry starting a failing plugin or stream.
78//! Please only exit if you're in a 100% failing state.
79//! It's considered better to continue operating in a degraded state than exiting.
80//!
81//! ## Getting started
82//!
83//! If you don't have Rust already installed, follow their [*Getting Started* documentation](https://rust-lang.org/learn/get-started/)
84//! to get rust build tools and learn about editor support.
85//!
86//! Then create a new repository with cargo:
87//!
88//! ```bash
89//! cargo new reaction-plugin-$NAME
90//! cd reaction-plugin-$NAME
91//! ```
92//!
93//! Add required dependencies:
94//!
95//! ```bash
96//! cargo add reaction-plugin tokio
97//! ```
98//!
99//! Replace `src/main.rs` with those contents:
100//!
101//! ```ignore
102//! use reaction_plugin::PluginInfo;
103//!
104//! #[tokio::main]
105//! async fn main() {
106//!     let plugin = MyPlugin::default();
107//!     reaction_plugin::main_loop(plugin).await;
108//! }
109//!
110//! #[derive(Default)]
111//! struct MyPlugin {}
112//!
113//! impl PluginInfo for MyPlugin {
114//!   // ...
115//! }
116//! ```
117//!
118//! Your IDE should now propose to implement missing members of the [`PluginInfo`] trait.
119//! Your journey starts!
120//!
121//! ## Examples
122//!
123//! Core plugins can be found here: <https://framagit.org/ppom/reaction/-/tree/main/plugins>.
124//!
125//! - The "virtual" plugin is the simplest and can serve as a good complete example that links custom stream types and custom action types.
126//! - The "ipset" plugin is a good example of an action-only plugin.
127
128use std::{
129    collections::{BTreeMap, BTreeSet},
130    env::args,
131    error::Error,
132    fmt::Display,
133    process::exit,
134    time::Duration,
135};
136
137use remoc::{
138    Connect, rch,
139    rtc::{self, Server},
140};
141use serde::{Deserialize, Serialize};
142use serde_json::{Number, Value as JValue};
143use tokio::io::{stdin, stdout};
144
145pub mod line;
146pub mod shutdown;
147pub mod time;
148
149/// The only trait that **must** be implemented by a plugin.
150/// It provides lists of stream, filter and action types implemented by a dynamic plugin.
151#[rtc::remote]
152pub trait PluginInfo {
153    /// Return the manifest of the plugin.
154    /// This should not be dynamic, and return always the same manifest.
155    ///
156    /// Example implementation:
157    /// ```
158    /// Ok(Manifest {
159    ///     hello: Hello::new(),
160    ///     streams: BTreeSet::from(["mystreamtype".into()]),
161    ///     actions: BTreeSet::from(["myactiontype".into()]),
162    /// })
163    /// ```
164    ///
165    /// First function called.
166    async fn manifest(&mut self) -> Result<Manifest, rtc::CallError>;
167
168    /// Load all plugin stream and action configurations.
169    /// Must error if config is invalid.
170    ///
171    /// The plugin should not start running mutable commands here:
172    /// It should be ok to quit without cleanup for now.
173    ///
174    /// Each [`StreamConfig`] from the `streams` arg should result in a corresponding [`StreamImpl`] returned, in the same order.
175    /// Each [`ActionConfig`] from the `actions` arg should result in a corresponding [`ActionImpl`] returned, in the same order.
176    ///
177    /// Function called after [`PluginInfo::manifest`].
178    async fn load_config(
179        &mut self,
180        streams: Vec<StreamConfig>,
181        actions: Vec<ActionConfig>,
182    ) -> RemoteResult<(Vec<StreamImpl>, Vec<ActionImpl>)>;
183
184    /// Notify the plugin that setup is finished, permitting a last occasion to report an error that'll make reaction exit.
185    /// All initialization (opening remote connections, starting streams, etc) should happen here.
186    ///
187    /// Function called after [`PluginInfo::load_config`].
188    async fn start(&mut self) -> RemoteResult<()>;
189
190    /// Notify the plugin that reaction is quitting and that the plugin should quit too.
191    /// A few seconds later, the plugin will receive SIGTERM.
192    /// A few seconds later, the plugin will receive SIGKILL.
193    ///
194    /// Function called after [`PluginInfo::start`], when reaction is quitting.
195    async fn close(mut self) -> RemoteResult<()>;
196}
197
198/// The config for one Stream of a type advertised by this plugin.
199///
200/// For example this user config:
201/// ```jsonnet
202/// {
203///   streams: {
204///     mystream: {
205///       type: "mystreamtype",
206///       options: {
207///         key: "value",
208///         num: 3,
209///       },
210///       // filters: ...
211///     },
212///   },
213/// }
214/// ```
215///
216/// would result in the following `StreamConfig`:
217///
218/// ```
219/// StreamConfig {
220///   stream_name: "mystream",
221///   stream_type: "mystreamtype",
222///   config: Value::Object(BTreeMap::from([
223///     ("key", Value::String("value")),
224///     ("num", Value::Integer(3)),
225///   ])),
226/// }
227/// ```
228///
229/// Don't hesitate to take advantage of [`serde_json::from_value`], to deserialize the [`Value`] into a Rust struct:
230///
231/// ```
232/// #[derive(Deserialize)]
233/// struct MyStreamOptions {
234///   key: String,
235///   num: i64,
236/// }
237///
238/// fn validate_config(stream_config: Value) -> Result<MyStreamOptions, serde_json::Error> {
239///   serde_json::from_value(stream_config.into())
240/// }
241/// ```
242#[derive(Serialize, Deserialize, Clone)]
243pub struct StreamConfig {
244    pub stream_name: String,
245    pub stream_type: String,
246    pub config: Value,
247}
248
249/// The config for one Stream of a type advertised by this plugin.
250///
251/// For example this user config:
252/// ```jsonnet
253/// {
254///   streams: {
255///     mystream: {
256///       // ...
257///       filters: {
258///         myfilter: {
259///           // ...
260///           actions: {
261///             myaction: {
262///               type: "myactiontype",
263///               options: {
264///                 boolean: true,
265///                 array: ["item"],
266///               },
267///             },
268///           },
269///         },
270///       },
271///     },
272///   },
273/// }
274/// ```
275///
276/// would result in the following `ActionConfig`:
277///
278/// ```rust
279/// ActionConfig {
280///   action_name: "myaction",
281///   action_type: "myactiontype",
282///   config: Value::Object(BTreeMap::from([
283///     ("boolean", Value::Boolean(true)),
284///     ("array", Value::Array([Value::String("item")])),
285///   ])),
286/// }
287/// ```
288///
289/// Don't hesitate to take advantage of [`serde_json::from_value`], to deserialize the [`Value`] into a Rust struct:
290///
291/// ```rust
292/// #[derive(Deserialize)]
293/// struct MyActionOptions {
294///   boolean: bool,
295///   array: Vec<String>,
296/// }
297///
298/// fn validate_config(action_config: Value) -> Result<MyActionOptions, serde_json::Error> {
299///   serde_json::from_value(action_config.into())
300/// }
301/// ```
302#[derive(Serialize, Deserialize, Clone)]
303pub struct ActionConfig {
304    pub stream_name: String,
305    pub filter_name: String,
306    pub action_name: String,
307    pub action_type: String,
308    pub config: Value,
309    pub patterns: Vec<String>,
310}
311
312/// Mandatory announcement of a plugin's protocol version, stream and action types.
313#[derive(Serialize, Deserialize)]
314pub struct Manifest {
315    // Protocol version.
316    // Just use the [`Hello::new`] constructor that uses this crate's current version.
317    pub hello: Hello,
318    /// Stream types that should be made available to reaction users
319    ///
320    /// ```jsonnet
321    /// {
322    ///   streams: {
323    ///     my_stream: {
324    ///       type: "..."
325    ///       # ↑ all those exposed types
326    ///     }
327    ///   }
328    /// }
329    /// ```
330    pub streams: BTreeSet<String>,
331    /// Action types that should be made available to reaction users
332    ///
333    /// ```jsonnet
334    /// {
335    ///   streams: {
336    ///     mystream: {
337    ///       filters: {
338    ///         myfilter: {
339    ///           actions: {
340    ///             myaction: {
341    ///               type: "myactiontype",
342    ///                # ↑ all those exposed types
343    ///             },
344    ///           },
345    ///         },
346    ///       },
347    ///     },
348    ///   },
349    /// }
350    /// ```
351    pub actions: BTreeSet<String>,
352}
353
354#[derive(Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
355pub struct Hello {
356    /// Major version of the protocol
357    /// Increment means breaking change
358    pub version_major: u32,
359    /// Minor version of the protocol
360    /// Increment means reaction core can handle older version plugins
361    pub version_minor: u32,
362}
363
364impl Hello {
365    /// Constructor that fills a [`Hello`] struct with [`crate`]'s version.
366    /// You should use this in your plugin [`Manifest`].
367    pub fn new() -> Hello {
368        Hello {
369            version_major: env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
370            version_minor: env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
371        }
372    }
373
374    /// Used by the reaction daemon. Permits to check compatibility between two versions.
375    /// Major versions must be the same between the daemon and plugin.
376    /// Minor version of the daemon must be greater than or equal minor version of the plugin.
377    pub fn is_compatible(server: &Hello, plugin: &Hello) -> std::result::Result<(), String> {
378        if server.version_major == plugin.version_major
379            && server.version_minor >= plugin.version_minor
380        {
381            Ok(())
382        } else if plugin.version_major > server.version_major
383            || (plugin.version_major == server.version_major
384                && plugin.version_minor > server.version_minor)
385        {
386            Err("consider upgrading reaction".into())
387        } else {
388            Err("consider upgrading the plugin".into())
389        }
390    }
391}
392
393/// A clone of [`serde_json::Value`].
394/// Implements From & Into [`serde_json::Value`].
395#[derive(Serialize, Deserialize, Clone)]
396pub enum Value {
397    Null,
398    Bool(bool),
399    Integer(i64),
400    Float(f64),
401    String(String),
402    Array(Vec<Value>),
403    Object(BTreeMap<String, Value>),
404}
405
406impl From<JValue> for Value {
407    fn from(value: serde_json::Value) -> Self {
408        match value {
409            JValue::Null => Value::Null,
410            JValue::Bool(b) => Value::Bool(b),
411            JValue::Number(number) => {
412                if let Some(number) = number.as_i64() {
413                    Value::Integer(number)
414                } else if let Some(number) = number.as_f64() {
415                    Value::Float(number)
416                } else {
417                    Value::Null
418                }
419            }
420            JValue::String(s) => Value::String(s.into()),
421            JValue::Array(v) => Value::Array(v.into_iter().map(|e| e.into()).collect()),
422            JValue::Object(m) => Value::Object(m.into_iter().map(|(k, v)| (k, v.into())).collect()),
423        }
424    }
425}
426
427impl Into<JValue> for Value {
428    fn into(self) -> JValue {
429        match self {
430            Value::Null => JValue::Null,
431            Value::Bool(v) => JValue::Bool(v),
432            Value::Integer(v) => JValue::Number(v.into()),
433            Value::Float(v) => JValue::Number(Number::from_f64(v).unwrap()),
434            Value::String(v) => JValue::String(v),
435            Value::Array(v) => JValue::Array(v.into_iter().map(|e| e.into()).collect()),
436            Value::Object(m) => JValue::Object(m.into_iter().map(|(k, v)| (k, v.into())).collect()),
437        }
438    }
439}
440
441/// Represents a Stream handled by a plugin on reaction core's side.
442///
443/// During [`PluginInfo::load_config`], the plugin should create a [`remoc::rch::mpsc::channel`] of [`Line`].
444/// It will keep the sending side for itself and put the receiving side in a [`StreamImpl`].
445///
446/// The plugin should start sending [`Line`]s in the channel only after [`PluginInfo::start`] has been called by reaction core.
447#[derive(Debug, Serialize, Deserialize)]
448pub struct StreamImpl {
449    pub stream: rch::mpsc::Receiver<Line>,
450    /// Whether this stream works standalone, or if it needs other streams or actions to be fed.
451    /// Defaults to true.
452    /// When `false`, reaction will exit if it's the last one standing.
453    #[serde(default = "_true")]
454    pub standalone: bool,
455}
456
457fn _true() -> bool {
458    true
459}
460
461/// Messages passed from the [`StreamImpl`] of a plugin to reaction core
462pub type Line = (String, Duration);
463
464// // Filters
465// // For now, plugins can't handle custom filter implementations.
466// #[derive(Serialize, Deserialize)]
467// pub struct FilterImpl {
468//     pub stream: rch::lr::Sender<Exec>,
469// }
470// #[derive(Serialize, Deserialize)]
471// pub struct Match {
472//     pub match_: String,
473//     pub result: rch::oneshot::Sender<bool>,
474// }
475
476/// Represents an Action handled by a plugin on reaction core's side.
477///
478/// During [`PluginInfo::load_config`], the plugin should create a [`remoc::rch::mpsc::channel`] of [`Exec`].
479/// It will keep the receiving side for itself and put the sending side in a [`ActionImpl`].
480///
481/// The plugin will start receiving [`Exec`]s in the channel from reaction only after [`PluginInfo::start`] has been called by reaction core.
482#[derive(Clone, Serialize, Deserialize)]
483pub struct ActionImpl {
484    pub tx: rch::mpsc::Sender<Exec>,
485}
486
487/// A [trigger](https://reaction.ppom.me/reference.html#trigger) of the Action, sent by reaction core to the plugin.
488///
489/// The plugin should perform the configured action for each received [`Exec`].
490///
491/// Any error during its execution should be logged to stderr, see [`crate#Errors`] for error handling recommandations.
492#[derive(Serialize, Deserialize)]
493pub struct Exec {
494    pub match_: Vec<String>,
495    pub time: Duration,
496}
497
498/// The main loop for a plugin.
499///
500/// Bootstraps the communication with reaction core on the process' stdin and stdout,
501/// then holds the connection and maintains the plugin in a server state.
502///
503/// Your main function should only create a struct that implements [`PluginInfo`]
504/// and then call [`main_loop`]:
505/// ```ignore
506/// #[tokio::main]
507/// async fn main() {
508///     let plugin = MyPlugin::default();
509///     reaction_plugin::main_loop(plugin).await;
510/// }
511/// ```
512pub async fn main_loop<T: PluginInfo + Send + Sync + 'static>(plugin_info: T) {
513    // First check that we're called by reaction
514    let mut args = args();
515    // skip 0th argument
516    let _skip = args.next();
517    if args.next().is_none_or(|arg| arg != "serve") {
518        eprintln!("This plugin is not meant to be called as-is.");
519        eprintln!(
520            "reaction daemon starts plugins itself and communicates with them on stdin, stdout and stderr."
521        );
522        eprintln!("See the doc on plugin configuration: https://reaction.ppom.me/plugins/");
523        exit(1);
524    } else {
525        let (conn, mut tx, _rx): (
526            _,
527            remoc::rch::base::Sender<PluginInfoClient>,
528            remoc::rch::base::Receiver<()>,
529        ) = Connect::io(remoc::Cfg::default(), stdin(), stdout())
530            .await
531            .unwrap();
532
533        let (server, client) = PluginInfoServer::new(plugin_info, 1);
534
535        let (res1, (_, res2), res3) = tokio::join!(tx.send(client), server.serve(), conn);
536        let mut exit_code = 0;
537        if let Err(err) = res1 {
538            eprintln!("ERROR could not send plugin info to reaction: {err}");
539            exit_code = 1;
540        }
541        if let Err(err) = res2 {
542            eprintln!("ERROR could not launch plugin service for reaction: {err}");
543            exit_code = 2;
544        }
545        if let Err(err) = res3 {
546            eprintln!("ERROR connection error with reaction: {err}");
547            exit_code = 3;
548        }
549        exit(exit_code);
550    }
551}
552
553// Errors
554
555pub type RemoteResult<T> = Result<T, RemoteError>;
556
557/// reaction-plugin's Error type.
558#[derive(Debug, Serialize, Deserialize)]
559pub enum RemoteError {
560    /// A connection error that origins from [`remoc`], the crate used for communication on the plugin's `stdin`/`stdout`.
561    ///
562    /// You should not instantiate this type of error yourself.
563    Remoc(rtc::CallError),
564    /// A free String for application-specific errors.
565    ///
566    /// You should only instantiate this type of error yourself, for any error that you encounter at startup and shutdown.
567    ///
568    /// Otherwise, any error during the plugin's runtime should be logged to stderr, see [`crate#Errors`] for error handling recommandations.
569    Plugin(String),
570}
571
572impl Display for RemoteError {
573    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
574        match self {
575            RemoteError::Remoc(call_error) => write!(f, "communication error: {call_error}"),
576            RemoteError::Plugin(err) => write!(f, "{err}"),
577        }
578    }
579}
580
581impl Error for RemoteError {}
582
583impl From<String> for RemoteError {
584    fn from(value: String) -> Self {
585        Self::Plugin(value)
586    }
587}
588
589impl From<&str> for RemoteError {
590    fn from(value: &str) -> Self {
591        Self::Plugin(value.into())
592    }
593}
594
595impl From<rtc::CallError> for RemoteError {
596    fn from(value: rtc::CallError) -> Self {
597        Self::Remoc(value)
598    }
599}