resymo_agent/collector/
exec.rs

1use crate::config::CommonCollector;
2use crate::utils::is_default;
3use anyhow::{anyhow, bail};
4use async_trait::async_trait;
5use homeassistant_agent::model::Discovery;
6use serde_json::{json, Value};
7use std::collections::HashMap;
8use std::ops::Deref;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::process::Command;
12use tokio::sync::Mutex;
13
14#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
15#[serde(rename_all = "camelCase")]
16pub struct Configuration {
17    #[serde(flatten)]
18    pub common: CommonCollector,
19
20    /// execution tasks
21    #[serde(default)]
22    pub items: HashMap<String, Task>,
23}
24
25impl Deref for Configuration {
26    type Target = CommonCollector;
27
28    fn deref(&self) -> &Self::Target {
29        &self.common
30    }
31}
32
33#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
34#[serde(rename_all = "camelCase")]
35pub struct Task {
36    #[serde(with = "humantime_serde", default = "default::period")]
37    #[schemars(schema_with = "crate::utils::humantime_duration")]
38    pub period: Duration,
39
40    /// The binary to call
41    pub command: String,
42
43    /// The arguments
44    #[serde(default, skip_serializing_if = "Vec::is_empty")]
45    pub args: Vec<String>,
46
47    /// The environment variables
48    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
49    pub envs: HashMap<String, String>,
50
51    #[serde(default, skip_serializing_if = "is_default")]
52    pub clean_env: bool,
53
54    /// The Home Assistant discovery section
55    #[serde(default, skip_serializing_if = "Vec::is_empty")]
56    pub discovery: Vec<Discovery>,
57}
58
59mod default {
60    use super::*;
61
62    pub const fn period() -> Duration {
63        Duration::from_secs(60)
64    }
65}
66
67#[derive(Clone, Debug)]
68pub struct Error(String);
69
70impl From<Error> for anyhow::Error {
71    fn from(value: Error) -> Self {
72        anyhow!("{}", value.0)
73    }
74}
75
76#[derive(Debug)]
77struct Inner {
78    config: Task,
79    last_run: Option<Instant>,
80    state: Result<Value, Error>,
81}
82
83impl Inner {
84    fn new(config: Task) -> Self {
85        Self {
86            config,
87            last_run: None,
88            state: Err(Error("Not yet initialized".into())),
89        }
90    }
91
92    async fn run_once(&mut self) -> anyhow::Result<Value> {
93        let mut cmd = Command::new(&self.config.command);
94
95        if self.config.clean_env {
96            cmd.env_clear();
97        }
98
99        cmd.kill_on_drop(true)
100            .args(self.config.args.clone())
101            .envs(self.config.envs.clone());
102
103        let output = cmd.output().await?;
104        if !output.status.success() {
105            bail!("Command failed: rc == {}", output.status);
106        }
107
108        self.mark_run();
109
110        let stdout = String::from_utf8_lossy(&output.stdout);
111        let stderr = String::from_utf8_lossy(&output.stderr);
112        let status = output.status.code();
113
114        Ok(json!({
115            "stdout": stdout,
116            "stderr": stderr,
117            "status": status,
118        }))
119    }
120
121    async fn run(&mut self) -> anyhow::Result<Value> {
122        if self.need_run() {
123            self.state = self.run_once().await.map_err(|err| Error(err.to_string()));
124        }
125
126        Ok(self.state.clone()?)
127    }
128
129    fn need_run(&self) -> bool {
130        match self.last_run {
131            Some(last_run) => Instant::now() - last_run > self.config.period,
132            None => true,
133        }
134    }
135
136    fn mark_run(&mut self) {
137        // TODO: we should do better and provide a constant delay
138        self.last_run = Some(Instant::now());
139    }
140}
141
142#[derive(Debug)]
143pub struct Collector {
144    inner: Arc<Mutex<Inner>>,
145    descriptor: Vec<Discovery>,
146}
147
148impl Collector {
149    pub fn new(config: Configuration) -> HashMap<String, Self> {
150        config
151            .items
152            .into_iter()
153            .map(|(name, mut task)| {
154                let mut discovery = task.discovery.drain(..).collect::<Vec<_>>();
155
156                let mut auto = 0;
157                for discovery in &mut discovery {
158                    if discovery.unique_id.is_none() {
159                        let name = if auto == 0 {
160                            name.to_string()
161                        } else {
162                            // start suffixing items with a counter, better provide a unique_id
163                            format!("{name}_{auto}")
164                        };
165                        discovery.unique_id = Some(name);
166                        auto += 1;
167                    }
168
169                    if discovery.value_template.is_none() {
170                        // default to stdout
171                        discovery.value_template = Some("{{ value_json.stdout }}".into());
172                    }
173                }
174
175                let collector = Self {
176                    descriptor: discovery.into_iter().collect(),
177                    inner: Arc::new(Mutex::new(Inner::new(task))),
178                };
179
180                (name, collector)
181            })
182            .collect()
183    }
184}
185
186#[async_trait]
187impl super::Collector for Collector {
188    async fn collect(&self) -> anyhow::Result<Value> {
189        self.inner.lock().await.run().await
190    }
191
192    fn describe_ha(&self) -> Vec<Discovery> {
193        self.descriptor.clone()
194    }
195}