resymo_agent/collector/
exec.rs1use crate::config::CommonCollector;
2use crate::utils::is_default;
3use anyhow::{anyhow, bail};
4use async_trait::async_trait;
5use homeassistant_agent::model::Discovery;
6use serde_json::{json, Value};
7use std::collections::HashMap;
8use std::ops::Deref;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::process::Command;
12use tokio::sync::Mutex;
13
14#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
15#[serde(rename_all = "camelCase")]
16pub struct Configuration {
17 #[serde(flatten)]
18 pub common: CommonCollector,
19
20 #[serde(default)]
22 pub items: HashMap<String, Task>,
23}
24
25impl Deref for Configuration {
26 type Target = CommonCollector;
27
28 fn deref(&self) -> &Self::Target {
29 &self.common
30 }
31}
32
33#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
34#[serde(rename_all = "camelCase")]
35pub struct Task {
36 #[serde(with = "humantime_serde", default = "default::period")]
37 #[schemars(schema_with = "crate::utils::humantime_duration")]
38 pub period: Duration,
39
40 pub command: String,
42
43 #[serde(default, skip_serializing_if = "Vec::is_empty")]
45 pub args: Vec<String>,
46
47 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
49 pub envs: HashMap<String, String>,
50
51 #[serde(default, skip_serializing_if = "is_default")]
52 pub clean_env: bool,
53
54 #[serde(default, skip_serializing_if = "Vec::is_empty")]
56 pub discovery: Vec<Discovery>,
57}
58
59mod default {
60 use super::*;
61
62 pub const fn period() -> Duration {
63 Duration::from_secs(60)
64 }
65}
66
67#[derive(Clone, Debug)]
68pub struct Error(String);
69
70impl From<Error> for anyhow::Error {
71 fn from(value: Error) -> Self {
72 anyhow!("{}", value.0)
73 }
74}
75
76#[derive(Debug)]
77struct Inner {
78 config: Task,
79 last_run: Option<Instant>,
80 state: Result<Value, Error>,
81}
82
83impl Inner {
84 fn new(config: Task) -> Self {
85 Self {
86 config,
87 last_run: None,
88 state: Err(Error("Not yet initialized".into())),
89 }
90 }
91
92 async fn run_once(&mut self) -> anyhow::Result<Value> {
93 let mut cmd = Command::new(&self.config.command);
94
95 if self.config.clean_env {
96 cmd.env_clear();
97 }
98
99 cmd.kill_on_drop(true)
100 .args(self.config.args.clone())
101 .envs(self.config.envs.clone());
102
103 let output = cmd.output().await?;
104 if !output.status.success() {
105 bail!("Command failed: rc == {}", output.status);
106 }
107
108 self.mark_run();
109
110 let stdout = String::from_utf8_lossy(&output.stdout);
111 let stderr = String::from_utf8_lossy(&output.stderr);
112 let status = output.status.code();
113
114 Ok(json!({
115 "stdout": stdout,
116 "stderr": stderr,
117 "status": status,
118 }))
119 }
120
121 async fn run(&mut self) -> anyhow::Result<Value> {
122 if self.need_run() {
123 self.state = self.run_once().await.map_err(|err| Error(err.to_string()));
124 }
125
126 Ok(self.state.clone()?)
127 }
128
129 fn need_run(&self) -> bool {
130 match self.last_run {
131 Some(last_run) => Instant::now() - last_run > self.config.period,
132 None => true,
133 }
134 }
135
136 fn mark_run(&mut self) {
137 self.last_run = Some(Instant::now());
139 }
140}
141
142#[derive(Debug)]
143pub struct Collector {
144 inner: Arc<Mutex<Inner>>,
145 descriptor: Vec<Discovery>,
146}
147
148impl Collector {
149 pub fn new(config: Configuration) -> HashMap<String, Self> {
150 config
151 .items
152 .into_iter()
153 .map(|(name, mut task)| {
154 let mut discovery = task.discovery.drain(..).collect::<Vec<_>>();
155
156 let mut auto = 0;
157 for discovery in &mut discovery {
158 if discovery.unique_id.is_none() {
159 let name = if auto == 0 {
160 name.to_string()
161 } else {
162 format!("{name}_{auto}")
164 };
165 discovery.unique_id = Some(name);
166 auto += 1;
167 }
168
169 if discovery.value_template.is_none() {
170 discovery.value_template = Some("{{ value_json.stdout }}".into());
172 }
173 }
174
175 let collector = Self {
176 descriptor: discovery.into_iter().collect(),
177 inner: Arc::new(Mutex::new(Inner::new(task))),
178 };
179
180 (name, collector)
181 })
182 .collect()
183 }
184}
185
186#[async_trait]
187impl super::Collector for Collector {
188 async fn collect(&self) -> anyhow::Result<Value> {
189 self.inner.lock().await.run().await
190 }
191
192 fn describe_ha(&self) -> Vec<Discovery> {
193 self.descriptor.clone()
194 }
195}