acts_package_state/
lib.rs

1//! Acts postgres store
2
3#![allow(rustdoc::bare_urls)]
4// #![doc = include_str!("../README.md")]
5
6mod config;
7mod package;
8
9#[cfg(test)]
10mod tests;
11
12use acts::{ActError, ActPackage, ActPlugin, ChannelOptions, Result};
13use package::StatePackage;
14
15const CONFIG_NAME: &str = "state";
16#[derive(Clone)]
17pub struct StatePackagePlugin;
18
19#[async_trait::async_trait]
20impl ActPlugin for StatePackagePlugin {
21    async fn on_init(&self, engine: &acts::Engine) -> Result<()> {
22        if !engine.config().has(CONFIG_NAME) {
23            println!(
24                "skip the initialization of StatePackagePlugin for no 'state' secion in config file"
25            );
26            return Ok(());
27        }
28        let config = engine
29            .config()
30            .get::<config::StateConfig>(CONFIG_NAME)
31            .map_err(|err| acts::ActError::Config(format!("get state config error: {}", err)))?;
32
33        let mut client = redis::Client::open(config.database_uri.as_str())
34            .map_err(|err| acts::ActError::Config(format!("create redis client error: {}", err)))?;
35
36        redis::cmd("PING")
37            .exec(&mut client)
38            .map_err(|err| acts::ActError::Config(format!("ping redis error: {}", err)))?;
39
40        let meta = package::StatePackage::meta();
41        engine.extender().register_package(&meta)?;
42
43        let executor = engine.executor();
44        let chan = engine.channel_with_options(&ChannelOptions {
45            id: meta.name.to_string(),
46            ack: true,
47            r#type: "act".to_string(),
48            state: "created".to_string(),
49            uses: meta.name.to_string(),
50            ..Default::default()
51        });
52        chan.on_message(move |e| {
53            // check the params in inputs
54            let Some(params) = e.inputs.get::<serde_json::Value>("params") else {
55                executor
56                    .act()
57                    .error(
58                        &e.pid,
59                        &e.tid,
60                        &ActError::Package("missing 'params' in inputs".to_string()).into(),
61                    )
62                    .unwrap();
63                return;
64            };
65
66            // convert the params to StatePackage
67            let pakage: StatePackage = serde_json::from_value(params).unwrap();
68            match pakage.run(&client, &e.pid) {
69                Ok(ref vars) => {
70                    executor.act().complete(&e.pid, &e.tid, vars).unwrap();
71                }
72                Err(err) => {
73                    executor.act().error(&e.pid, &e.tid, &err.into()).unwrap();
74                }
75            }
76        });
77        Ok(())
78    }
79}