Skip to main content

envoy/monitor/
mod.rs

1pub mod ci;
2pub mod doc;
3
4use sqlitegraph::GraphEntity;
5
6use serde::{Deserialize, Serialize};
7
8use crate::error::{EnvoyError, Result};
9
10pub const KIND_PROJECT_SUB: &str = "EnvoyProjectSubscription";
11pub const KIND_PROJECT_CFG: &str = "EnvoyProjectConfig";
12
13/// Stateless store for project-scoped agent subscriptions.
14pub struct SubscriptionStore;
15
16impl Default for SubscriptionStore {
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl SubscriptionStore {
23    pub fn new() -> Self {
24        Self
25    }
26
27    pub fn subscribe(
28        &self,
29        graph: &sqlitegraph::SqliteGraph,
30        agent_id: &str,
31        project: &str,
32    ) -> Result<()> {
33        let name = format!("sub-{}-{}", agent_id, project);
34        if graph
35            .find_entity_by_kind_and_name(KIND_PROJECT_SUB, &name)?
36            .is_some()
37        {
38            return Ok(());
39        }
40        let now = chrono::Utc::now().to_rfc3339();
41        let entity = GraphEntity {
42            id: 0,
43            kind: KIND_PROJECT_SUB.to_string(),
44            name,
45            file_path: None,
46            data: serde_json::json!({
47                "agent_id": agent_id,
48                "project": project,
49                "created_at": now,
50            }),
51        };
52        graph.insert_entity(&entity)?;
53        Ok(())
54    }
55
56    pub fn unsubscribe(
57        &self,
58        graph: &sqlitegraph::SqliteGraph,
59        agent_id: &str,
60        project: &str,
61    ) -> Result<()> {
62        let name = format!("sub-{}-{}", agent_id, project);
63        let entity = graph
64            .find_entity_by_kind_and_name(KIND_PROJECT_SUB, &name)?
65            .ok_or_else(|| {
66                EnvoyError::SubscriptionNotFound(agent_id.to_string(), project.to_string())
67            })?;
68        graph.delete_entity(entity.id)?;
69        Ok(())
70    }
71
72    pub fn list(&self, graph: &sqlitegraph::SqliteGraph, agent_id: &str) -> Result<Vec<String>> {
73        let entities = graph.find_entities_by_kind(KIND_PROJECT_SUB)?;
74        Ok(entities
75            .iter()
76            .filter(|e| read_str(&e.data, "agent_id") == agent_id)
77            .map(|e| read_str(&e.data, "project"))
78            .collect())
79    }
80
81    pub fn subscribers(
82        &self,
83        graph: &sqlitegraph::SqliteGraph,
84        project: &str,
85    ) -> Result<Vec<String>> {
86        let entities = graph.find_entities_by_kind(KIND_PROJECT_SUB)?;
87        Ok(entities
88            .iter()
89            .filter(|e| read_str(&e.data, "project") == project)
90            .map(|e| read_str(&e.data, "agent_id"))
91            .collect())
92    }
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ProjectConfig {
97    pub project: String,
98    pub ci_poll_seconds: u64,
99    pub doc_poll_seconds: u64,
100    pub ci_repo_owner: String,
101    pub doc_files: Vec<String>,
102}
103
104impl Default for ProjectConfig {
105    fn default() -> Self {
106        Self {
107            project: String::new(),
108            ci_poll_seconds: 60,
109            doc_poll_seconds: 300,
110            ci_repo_owner: String::new(),
111            doc_files: vec![],
112        }
113    }
114}
115
116/// Stateless store for per-project polling configuration.
117pub struct ProjectConfigStore;
118
119impl Default for ProjectConfigStore {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125impl ProjectConfigStore {
126    pub fn new() -> Self {
127        Self
128    }
129
130    pub fn get(&self, graph: &sqlitegraph::SqliteGraph, project: &str) -> Result<ProjectConfig> {
131        let name = format!("cfg-{}", project);
132        let entity = graph
133            .find_entity_by_kind_and_name(KIND_PROJECT_CFG, &name)?
134            .ok_or_else(|| EnvoyError::ProjectConfigNotFound(project.to_string()))?;
135        Ok(ProjectConfig {
136            project: read_str(&entity.data, "project"),
137            ci_poll_seconds: entity
138                .data
139                .get("ci_poll_seconds")
140                .and_then(|v| v.as_u64())
141                .unwrap_or(60),
142            doc_poll_seconds: entity
143                .data
144                .get("doc_poll_seconds")
145                .and_then(|v| v.as_u64())
146                .unwrap_or(300),
147            ci_repo_owner: read_str(&entity.data, "ci_repo_owner"),
148            doc_files: read_json_array(&entity.data, "doc_files"),
149        })
150    }
151
152    pub fn set(&self, graph: &sqlitegraph::SqliteGraph, config: &ProjectConfig) -> Result<()> {
153        let name = format!("cfg-{}", config.project);
154        let doc_json: Vec<serde_json::Value> = config
155            .doc_files
156            .iter()
157            .map(|s| serde_json::json!(s))
158            .collect();
159        let now = chrono::Utc::now().to_rfc3339();
160        if let Some(mut existing) = graph.find_entity_by_kind_and_name(KIND_PROJECT_CFG, &name)? {
161            existing.data["ci_poll_seconds"] = serde_json::json!(config.ci_poll_seconds);
162            existing.data["doc_poll_seconds"] = serde_json::json!(config.doc_poll_seconds);
163            existing.data["ci_repo_owner"] = serde_json::json!(config.ci_repo_owner);
164            existing.data["doc_files"] = serde_json::json!(doc_json);
165            existing.data["updated_at"] = serde_json::json!(&now);
166            graph.update_entity(&existing)?;
167        } else {
168            let entity = GraphEntity {
169                id: 0,
170                kind: KIND_PROJECT_CFG.to_string(),
171                name,
172                file_path: None,
173                data: serde_json::json!({
174                    "project": config.project,
175                    "ci_poll_seconds": config.ci_poll_seconds,
176                    "doc_poll_seconds": config.doc_poll_seconds,
177                    "ci_repo_owner": config.ci_repo_owner,
178                    "doc_files": doc_json,
179                    "created_at": now,
180                    "updated_at": now,
181                }),
182            };
183            graph.insert_entity(&entity)?;
184        }
185        Ok(())
186    }
187}
188
189fn read_str(data: &serde_json::Value, key: &str) -> String {
190    data.get(key)
191        .and_then(|v| v.as_str())
192        .unwrap_or("")
193        .to_string()
194}
195
196fn read_json_array(data: &serde_json::Value, key: &str) -> Vec<String> {
197    data.get(key)
198        .and_then(|v| v.as_array())
199        .map(|arr| {
200            arr.iter()
201                .filter_map(|v| v.as_str().map(String::from))
202                .collect()
203        })
204        .unwrap_or_default()
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::engine::Engine;
211
212    #[test]
213    fn subscribe_and_list() {
214        let engine = Engine::open_in_memory().unwrap();
215        let graph = engine.graph();
216        let store = SubscriptionStore::new();
217        store.subscribe(graph, "agent-1", "magellan").unwrap();
218        let subs = store.list(graph, "agent-1").unwrap();
219        assert!(subs.contains(&"magellan".to_string()));
220        let subscribers = store.subscribers(graph, "magellan").unwrap();
221        assert!(subscribers.contains(&"agent-1".to_string()));
222    }
223
224    #[test]
225    fn unsubscribe() {
226        let engine = Engine::open_in_memory().unwrap();
227        let graph = engine.graph();
228        let store = SubscriptionStore::new();
229        store.subscribe(graph, "agent-1", "magellan").unwrap();
230        store.unsubscribe(graph, "agent-1", "magellan").unwrap();
231        assert!(store.list(graph, "agent-1").unwrap().is_empty());
232    }
233
234    #[test]
235    fn project_config_crud() {
236        let engine = Engine::open_in_memory().unwrap();
237        let graph = engine.graph();
238        let store = ProjectConfigStore::new();
239        let cfg = ProjectConfig {
240            project: "magellan".into(),
241            ci_poll_seconds: 30,
242            doc_poll_seconds: 120,
243            ci_repo_owner: "oldnordic/magellan".into(),
244            doc_files: vec!["CHANGELOG.md".into()],
245        };
246        store.set(graph, &cfg).unwrap();
247        let got = store.get(graph, "magellan").unwrap();
248        assert_eq!(got.ci_poll_seconds, 30);
249        assert_eq!(got.doc_files, vec!["CHANGELOG.md"]);
250    }
251}