reaction_plugin/lib.rs
1//! This crate defines the API between reaction's core and plugins.
2//!
3//! Plugins must be written in Rust, for now.
4//!
5//! This documentation assumes the reader has some knowledge of Rust.
6//! However, if you find that something is unclear, don't hesitate to
7//! [ask for help](https://framagit.org/ppom/reaction/#help), even if you're new to Rust.
8//!
9//! To implement a plugin, one has to provide an implementation of [`PluginInfo`], that provides
10//! the entrypoint for a plugin.
11//! It permits to define `0` to `n` custom stream and action types.
12//!
13//! ## Note on reaction-plugin API stability
14//!
15//! This is the v1 of reaction's plugin interface.
16//! It's quite efficient and complete, but it has the big drawback of being Rust-only and [`tokio`]-only.
17//!
18//! In the future, I'd like to define a language-agnostic interface, which will be a major breaking change in the API.
19//! However, I'll try my best to reduce the necessary code changes for plugins that use this v1.
20//!
21//! ## Naming & calling conventions
22//!
23//! Your plugin should be named `reaction-plugin-$NAME`, eg. `reaction-plugin-postgresql`.
24//! It will be invoked with one positional argument "serve".
25//! ```bash
26//! reaction-plugin-$NAME serve
27//! ```
28//! This can be useful if you want to provide CLI functionnality to your users,
29//! so you can distinguish between a human user and reaction.
30//!
31//! ### State directory
32//!
33//! It will be executed in its own directory, in which it should have write access.
34//! The directory is `$reaction_state_directory/plugin_data/$NAME`.
35//! reaction's [state_directory](https://reaction.ppom.me/reference.html#state_directory)
36//! defaults to its working directory, which is `/var/lib/reaction` in most setups.
37//!
38//! So your plugin directory should most often be `/var/lib/reaction/plugin_data/$NAME`,
39//! but the plugin shouldn't expect that and use the current working directory instead.
40//!
41//! ## Communication
42//!
43//! Communication between the plugin and reaction is based on [`remoc`], which permits to multiplex channels and remote objects/functions/trait
44//! calls over a single transport channel.
45//! The channels read and write channels are stdin and stdout, so you shouldn't use them for something else.
46//!
47//! [`remoc`] builds upon [`tokio`], so you'll need to use tokio too.
48//!
49//! ### Errors
50//!
51//! Errors during:
52//! - config loading in [`PluginInfo::load_config`]
53//! - startup in [`PluginInfo::start`]
54//!
55//! should be returned to reaction by the function's return value, permitting reaction to abort startup.
56//!
57//! During normal runtime, after the plugin has loaded its config and started, and before reaction is quitting, there is no *rusty* way to send errors to reaction.
58//! Then errors can be printed to stderr.
59//! They'll be captured line by line and re-printed by reaction, with the plugin name prepended.
60//!
61//! A line can start with `DEBUG `, `INFO `, `WARN `, `ERROR `.
62//! If it starts with none of the above, the line is assumed to be an error.
63//!
64//! Example:
65//! Those lines:
66//! ```log
67//! WARN This is an official warning from the plugin
68//! Freeeee errrooooorrr
69//! ```
70//! Will become:
71//! ```log
72//! WARN plugin test: This is an official warning from the plugin
73//! ERROR plugin test: Freeeee errrooooorrr
74//! ```
75//!
76//! Plugins should not exit when there is an error: reaction quits only when told to do so,
77//! or if all its streams exit, and won't retry starting a failing plugin or stream.
78//! Please only exit if you're in a 100% failing state.
79//! It's considered better to continue operating in a degraded state than exiting.
80//!
81//! ## Getting started
82//!
83//! If you don't have Rust already installed, follow their [*Getting Started* documentation](https://rust-lang.org/learn/get-started/)
84//! to get rust build tools and learn about editor support.
85//!
86//! Then create a new repository with cargo:
87//!
88//! ```bash
89//! cargo new reaction-plugin-$NAME
90//! cd reaction-plugin-$NAME
91//! ```
92//!
93//! Add required dependencies:
94//!
95//! ```bash
96//! cargo add reaction-plugin tokio
97//! ```
98//!
99//! Replace `src/main.rs` with those contents:
100//!
101//! ```ignore
102//! use reaction_plugin::PluginInfo;
103//!
104//! #[tokio::main]
105//! async fn main() {
106//! let plugin = MyPlugin::default();
107//! reaction_plugin::main_loop(plugin).await;
108//! }
109//!
110//! #[derive(Default)]
111//! struct MyPlugin {}
112//!
113//! impl PluginInfo for MyPlugin {
114//! // ...
115//! }
116//! ```
117//!
118//! Your IDE should now propose to implement missing members of the [`PluginInfo`] trait.
119//! Your journey starts!
120//!
121//! ## Examples
122//!
123//! Core plugins can be found here: <https://framagit.org/ppom/reaction/-/tree/main/plugins>.
124//!
125//! - The "virtual" plugin is the simplest and can serve as a good complete example that links custom stream types and custom action types.
126//! - The "ipset" plugin is a good example of an action-only plugin.
127
128use std::{
129 collections::{BTreeMap, BTreeSet},
130 env::args,
131 error::Error,
132 fmt::Display,
133 process::exit,
134 time::Duration,
135};
136
137use remoc::{
138 Connect, rch,
139 rtc::{self, Server},
140};
141use serde::{Deserialize, Serialize};
142use serde_json::{Number, Value as JValue};
143use tokio::io::{stdin, stdout};
144
145pub mod line;
146pub mod shutdown;
147pub mod time;
148
149/// The only trait that **must** be implemented by a plugin.
150/// It provides lists of stream, filter and action types implemented by a dynamic plugin.
151#[rtc::remote]
152pub trait PluginInfo {
153 /// Return the manifest of the plugin.
154 /// This should not be dynamic, and return always the same manifest.
155 ///
156 /// Example implementation:
157 /// ```
158 /// Ok(Manifest {
159 /// hello: Hello::new(),
160 /// streams: BTreeSet::from(["mystreamtype".into()]),
161 /// actions: BTreeSet::from(["myactiontype".into()]),
162 /// })
163 /// ```
164 ///
165 /// First function called.
166 async fn manifest(&mut self) -> Result<Manifest, rtc::CallError>;
167
168 /// Load all plugin stream and action configurations.
169 /// Must error if config is invalid.
170 ///
171 /// The plugin should not start running mutable commands here:
172 /// It should be ok to quit without cleanup for now.
173 ///
174 /// Each [`StreamConfig`] from the `streams` arg should result in a corresponding [`StreamImpl`] returned, in the same order.
175 /// Each [`ActionConfig`] from the `actions` arg should result in a corresponding [`ActionImpl`] returned, in the same order.
176 ///
177 /// Function called after [`PluginInfo::manifest`].
178 async fn load_config(
179 &mut self,
180 streams: Vec<StreamConfig>,
181 actions: Vec<ActionConfig>,
182 ) -> RemoteResult<(Vec<StreamImpl>, Vec<ActionImpl>)>;
183
184 /// Notify the plugin that setup is finished, permitting a last occasion to report an error that'll make reaction exit.
185 /// All initialization (opening remote connections, starting streams, etc) should happen here.
186 ///
187 /// Function called after [`PluginInfo::load_config`].
188 async fn start(&mut self) -> RemoteResult<()>;
189
190 /// Notify the plugin that reaction is quitting and that the plugin should quit too.
191 /// A few seconds later, the plugin will receive SIGTERM.
192 /// A few seconds later, the plugin will receive SIGKILL.
193 ///
194 /// Function called after [`PluginInfo::start`], when reaction is quitting.
195 async fn close(mut self) -> RemoteResult<()>;
196}
197
198/// The config for one Stream of a type advertised by this plugin.
199///
200/// For example this user config:
201/// ```jsonnet
202/// {
203/// streams: {
204/// mystream: {
205/// type: "mystreamtype",
206/// options: {
207/// key: "value",
208/// num: 3,
209/// },
210/// // filters: ...
211/// },
212/// },
213/// }
214/// ```
215///
216/// would result in the following `StreamConfig`:
217///
218/// ```
219/// StreamConfig {
220/// stream_name: "mystream",
221/// stream_type: "mystreamtype",
222/// config: Value::Object(BTreeMap::from([
223/// ("key", Value::String("value")),
224/// ("num", Value::Integer(3)),
225/// ])),
226/// }
227/// ```
228///
229/// Don't hesitate to take advantage of [`serde_json::from_value`], to deserialize the [`Value`] into a Rust struct:
230///
231/// ```
232/// #[derive(Deserialize)]
233/// struct MyStreamOptions {
234/// key: String,
235/// num: i64,
236/// }
237///
238/// fn validate_config(stream_config: Value) -> Result<MyStreamOptions, serde_json::Error> {
239/// serde_json::from_value(stream_config.into())
240/// }
241/// ```
242#[derive(Serialize, Deserialize, Clone)]
243pub struct StreamConfig {
244 pub stream_name: String,
245 pub stream_type: String,
246 pub config: Value,
247}
248
249/// The config for one Stream of a type advertised by this plugin.
250///
251/// For example this user config:
252/// ```jsonnet
253/// {
254/// streams: {
255/// mystream: {
256/// // ...
257/// filters: {
258/// myfilter: {
259/// // ...
260/// actions: {
261/// myaction: {
262/// type: "myactiontype",
263/// options: {
264/// boolean: true,
265/// array: ["item"],
266/// },
267/// },
268/// },
269/// },
270/// },
271/// },
272/// },
273/// }
274/// ```
275///
276/// would result in the following `ActionConfig`:
277///
278/// ```rust
279/// ActionConfig {
280/// action_name: "myaction",
281/// action_type: "myactiontype",
282/// config: Value::Object(BTreeMap::from([
283/// ("boolean", Value::Boolean(true)),
284/// ("array", Value::Array([Value::String("item")])),
285/// ])),
286/// }
287/// ```
288///
289/// Don't hesitate to take advantage of [`serde_json::from_value`], to deserialize the [`Value`] into a Rust struct:
290///
291/// ```rust
292/// #[derive(Deserialize)]
293/// struct MyActionOptions {
294/// boolean: bool,
295/// array: Vec<String>,
296/// }
297///
298/// fn validate_config(action_config: Value) -> Result<MyActionOptions, serde_json::Error> {
299/// serde_json::from_value(action_config.into())
300/// }
301/// ```
302#[derive(Serialize, Deserialize, Clone)]
303pub struct ActionConfig {
304 pub stream_name: String,
305 pub filter_name: String,
306 pub action_name: String,
307 pub action_type: String,
308 pub config: Value,
309 pub patterns: Vec<String>,
310}
311
312/// Mandatory announcement of a plugin's protocol version, stream and action types.
313#[derive(Serialize, Deserialize)]
314pub struct Manifest {
315 // Protocol version.
316 // Just use the [`Hello::new`] constructor that uses this crate's current version.
317 pub hello: Hello,
318 /// Stream types that should be made available to reaction users
319 ///
320 /// ```jsonnet
321 /// {
322 /// streams: {
323 /// my_stream: {
324 /// type: "..."
325 /// # ↑ all those exposed types
326 /// }
327 /// }
328 /// }
329 /// ```
330 pub streams: BTreeSet<String>,
331 /// Action types that should be made available to reaction users
332 ///
333 /// ```jsonnet
334 /// {
335 /// streams: {
336 /// mystream: {
337 /// filters: {
338 /// myfilter: {
339 /// actions: {
340 /// myaction: {
341 /// type: "myactiontype",
342 /// # ↑ all those exposed types
343 /// },
344 /// },
345 /// },
346 /// },
347 /// },
348 /// },
349 /// }
350 /// ```
351 pub actions: BTreeSet<String>,
352}
353
354#[derive(Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
355pub struct Hello {
356 /// Major version of the protocol
357 /// Increment means breaking change
358 pub version_major: u32,
359 /// Minor version of the protocol
360 /// Increment means reaction core can handle older version plugins
361 pub version_minor: u32,
362}
363
364impl Hello {
365 /// Constructor that fills a [`Hello`] struct with [`crate`]'s version.
366 /// You should use this in your plugin [`Manifest`].
367 pub fn new() -> Hello {
368 Hello {
369 version_major: env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
370 version_minor: env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
371 }
372 }
373
374 /// Used by the reaction daemon. Permits to check compatibility between two versions.
375 /// Major versions must be the same between the daemon and plugin.
376 /// Minor version of the daemon must be greater than or equal minor version of the plugin.
377 pub fn is_compatible(server: &Hello, plugin: &Hello) -> std::result::Result<(), String> {
378 if server.version_major == plugin.version_major
379 && server.version_minor >= plugin.version_minor
380 {
381 Ok(())
382 } else if plugin.version_major > server.version_major
383 || (plugin.version_major == server.version_major
384 && plugin.version_minor > server.version_minor)
385 {
386 Err("consider upgrading reaction".into())
387 } else {
388 Err("consider upgrading the plugin".into())
389 }
390 }
391}
392
393/// A clone of [`serde_json::Value`].
394/// Implements From & Into [`serde_json::Value`].
395#[derive(Serialize, Deserialize, Clone)]
396pub enum Value {
397 Null,
398 Bool(bool),
399 Integer(i64),
400 Float(f64),
401 String(String),
402 Array(Vec<Value>),
403 Object(BTreeMap<String, Value>),
404}
405
406impl From<JValue> for Value {
407 fn from(value: serde_json::Value) -> Self {
408 match value {
409 JValue::Null => Value::Null,
410 JValue::Bool(b) => Value::Bool(b),
411 JValue::Number(number) => {
412 if let Some(number) = number.as_i64() {
413 Value::Integer(number)
414 } else if let Some(number) = number.as_f64() {
415 Value::Float(number)
416 } else {
417 Value::Null
418 }
419 }
420 JValue::String(s) => Value::String(s.into()),
421 JValue::Array(v) => Value::Array(v.into_iter().map(|e| e.into()).collect()),
422 JValue::Object(m) => Value::Object(m.into_iter().map(|(k, v)| (k, v.into())).collect()),
423 }
424 }
425}
426
427impl Into<JValue> for Value {
428 fn into(self) -> JValue {
429 match self {
430 Value::Null => JValue::Null,
431 Value::Bool(v) => JValue::Bool(v),
432 Value::Integer(v) => JValue::Number(v.into()),
433 Value::Float(v) => JValue::Number(Number::from_f64(v).unwrap()),
434 Value::String(v) => JValue::String(v),
435 Value::Array(v) => JValue::Array(v.into_iter().map(|e| e.into()).collect()),
436 Value::Object(m) => JValue::Object(m.into_iter().map(|(k, v)| (k, v.into())).collect()),
437 }
438 }
439}
440
441/// Represents a Stream handled by a plugin on reaction core's side.
442///
443/// During [`PluginInfo::load_config`], the plugin should create a [`remoc::rch::mpsc::channel`] of [`Line`].
444/// It will keep the sending side for itself and put the receiving side in a [`StreamImpl`].
445///
446/// The plugin should start sending [`Line`]s in the channel only after [`PluginInfo::start`] has been called by reaction core.
447#[derive(Debug, Serialize, Deserialize)]
448pub struct StreamImpl {
449 pub stream: rch::mpsc::Receiver<Line>,
450 /// Whether this stream works standalone, or if it needs other streams or actions to be fed.
451 /// Defaults to true.
452 /// When `false`, reaction will exit if it's the last one standing.
453 #[serde(default = "_true")]
454 pub standalone: bool,
455}
456
457fn _true() -> bool {
458 true
459}
460
461/// Messages passed from the [`StreamImpl`] of a plugin to reaction core
462pub type Line = (String, Duration);
463
464// // Filters
465// // For now, plugins can't handle custom filter implementations.
466// #[derive(Serialize, Deserialize)]
467// pub struct FilterImpl {
468// pub stream: rch::lr::Sender<Exec>,
469// }
470// #[derive(Serialize, Deserialize)]
471// pub struct Match {
472// pub match_: String,
473// pub result: rch::oneshot::Sender<bool>,
474// }
475
476/// Represents an Action handled by a plugin on reaction core's side.
477///
478/// During [`PluginInfo::load_config`], the plugin should create a [`remoc::rch::mpsc::channel`] of [`Exec`].
479/// It will keep the receiving side for itself and put the sending side in a [`ActionImpl`].
480///
481/// The plugin will start receiving [`Exec`]s in the channel from reaction only after [`PluginInfo::start`] has been called by reaction core.
482#[derive(Clone, Serialize, Deserialize)]
483pub struct ActionImpl {
484 pub tx: rch::mpsc::Sender<Exec>,
485}
486
487/// A [trigger](https://reaction.ppom.me/reference.html#trigger) of the Action, sent by reaction core to the plugin.
488///
489/// The plugin should perform the configured action for each received [`Exec`].
490///
491/// Any error during its execution should be logged to stderr, see [`crate#Errors`] for error handling recommandations.
492#[derive(Serialize, Deserialize)]
493pub struct Exec {
494 pub match_: Vec<String>,
495 pub time: Duration,
496}
497
498/// The main loop for a plugin.
499///
500/// Bootstraps the communication with reaction core on the process' stdin and stdout,
501/// then holds the connection and maintains the plugin in a server state.
502///
503/// Your main function should only create a struct that implements [`PluginInfo`]
504/// and then call [`main_loop`]:
505/// ```ignore
506/// #[tokio::main]
507/// async fn main() {
508/// let plugin = MyPlugin::default();
509/// reaction_plugin::main_loop(plugin).await;
510/// }
511/// ```
512pub async fn main_loop<T: PluginInfo + Send + Sync + 'static>(plugin_info: T) {
513 // First check that we're called by reaction
514 let mut args = args();
515 // skip 0th argument
516 let _skip = args.next();
517 if args.next().is_none_or(|arg| arg != "serve") {
518 eprintln!("This plugin is not meant to be called as-is.");
519 eprintln!(
520 "reaction daemon starts plugins itself and communicates with them on stdin, stdout and stderr."
521 );
522 eprintln!("See the doc on plugin configuration: https://reaction.ppom.me/plugins/");
523 exit(1);
524 } else {
525 let (conn, mut tx, _rx): (
526 _,
527 remoc::rch::base::Sender<PluginInfoClient>,
528 remoc::rch::base::Receiver<()>,
529 ) = Connect::io(remoc::Cfg::default(), stdin(), stdout())
530 .await
531 .unwrap();
532
533 let (server, client) = PluginInfoServer::new(plugin_info, 1);
534
535 let (res1, (_, res2), res3) = tokio::join!(tx.send(client), server.serve(), conn);
536 let mut exit_code = 0;
537 if let Err(err) = res1 {
538 eprintln!("ERROR could not send plugin info to reaction: {err}");
539 exit_code = 1;
540 }
541 if let Err(err) = res2 {
542 eprintln!("ERROR could not launch plugin service for reaction: {err}");
543 exit_code = 2;
544 }
545 if let Err(err) = res3 {
546 eprintln!("ERROR connection error with reaction: {err}");
547 exit_code = 3;
548 }
549 exit(exit_code);
550 }
551}
552
553// Errors
554
555pub type RemoteResult<T> = Result<T, RemoteError>;
556
557/// reaction-plugin's Error type.
558#[derive(Debug, Serialize, Deserialize)]
559pub enum RemoteError {
560 /// A connection error that origins from [`remoc`], the crate used for communication on the plugin's `stdin`/`stdout`.
561 ///
562 /// You should not instantiate this type of error yourself.
563 Remoc(rtc::CallError),
564 /// A free String for application-specific errors.
565 ///
566 /// You should only instantiate this type of error yourself, for any error that you encounter at startup and shutdown.
567 ///
568 /// Otherwise, any error during the plugin's runtime should be logged to stderr, see [`crate#Errors`] for error handling recommandations.
569 Plugin(String),
570}
571
572impl Display for RemoteError {
573 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
574 match self {
575 RemoteError::Remoc(call_error) => write!(f, "communication error: {call_error}"),
576 RemoteError::Plugin(err) => write!(f, "{err}"),
577 }
578 }
579}
580
581impl Error for RemoteError {}
582
583impl From<String> for RemoteError {
584 fn from(value: String) -> Self {
585 Self::Plugin(value)
586 }
587}
588
589impl From<&str> for RemoteError {
590 fn from(value: &str) -> Self {
591 Self::Plugin(value.into())
592 }
593}
594
595impl From<rtc::CallError> for RemoteError {
596 fn from(value: rtc::CallError) -> Self {
597 Self::Remoc(value)
598 }
599}