acts_package_state/
lib.rs1#![allow(rustdoc::bare_urls)]
4mod config;
7mod package;
8
9#[cfg(test)]
10mod tests;
11
12use acts::{ActError, ActPackage, ActPlugin, ChannelOptions, Result};
13use package::StatePackage;
14
15const CONFIG_NAME: &str = "state";
16#[derive(Clone)]
17pub struct StatePackagePlugin;
18
19#[async_trait::async_trait]
20impl ActPlugin for StatePackagePlugin {
21 async fn on_init(&self, engine: &acts::Engine) -> Result<()> {
22 if !engine.config().has(CONFIG_NAME) {
23 println!(
24 "skip the initialization of StatePackagePlugin for no 'state' secion in config file"
25 );
26 return Ok(());
27 }
28 let config = engine
29 .config()
30 .get::<config::StateConfig>(CONFIG_NAME)
31 .map_err(|err| acts::ActError::Config(format!("get state config error: {}", err)))?;
32
33 let mut client = redis::Client::open(config.database_uri.as_str())
34 .map_err(|err| acts::ActError::Config(format!("create redis client error: {}", err)))?;
35
36 redis::cmd("PING")
37 .exec(&mut client)
38 .map_err(|err| acts::ActError::Config(format!("ping redis error: {}", err)))?;
39
40 let meta = package::StatePackage::meta();
41 engine.extender().register_package(&meta)?;
42
43 let executor = engine.executor();
44 let chan = engine.channel_with_options(&ChannelOptions {
45 id: meta.name.to_string(),
46 ack: true,
47 r#type: "act".to_string(),
48 state: "created".to_string(),
49 uses: meta.name.to_string(),
50 ..Default::default()
51 });
52 chan.on_message(move |e| {
53 let Some(params) = e.inputs.get::<serde_json::Value>("params") else {
55 executor
56 .act()
57 .error(
58 &e.pid,
59 &e.tid,
60 &ActError::Package("missing 'params' in inputs".to_string()).into(),
61 )
62 .unwrap();
63 return;
64 };
65
66 let pakage: StatePackage = serde_json::from_value(params).unwrap();
68 match pakage.run(&client, &e.pid) {
69 Ok(ref vars) => {
70 executor.act().complete(&e.pid, &e.tid, vars).unwrap();
71 }
72 Err(err) => {
73 executor.act().error(&e.pid, &e.tid, &err.into()).unwrap();
74 }
75 }
76 });
77 Ok(())
78 }
79}