1use axum::{extract::State, routing::get, Json, Router};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18use crate::error::ServerError;
19use crate::state::AppState;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(rename_all = "camelCase")]
25pub struct PollingConfig {
26 pub enabled: bool,
27 pub interval_seconds: u32,
28 pub last_event_ids: HashMap<String, String>,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub last_checked_at: Option<String>,
31 pub is_running: bool,
32}
33
34impl Default for PollingConfig {
35 fn default() -> Self {
36 let enabled = std::env::var("GITHUB_POLLING_ENABLED")
38 .unwrap_or_else(|_| "false".to_string())
39 .parse()
40 .unwrap_or(false);
41
42 let interval_seconds = std::env::var("GITHUB_POLLING_INTERVAL")
43 .unwrap_or_else(|_| "30".to_string())
44 .parse()
45 .unwrap_or(30)
46 .max(10); Self {
49 enabled,
50 interval_seconds,
51 last_event_ids: HashMap::new(),
52 last_checked_at: None,
53 is_running: false,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct GitHubEvent {
61 pub id: String,
62 #[serde(rename = "type")]
63 pub event_type: String,
64 pub actor: GitHubActor,
65 pub repo: GitHubRepo,
66 pub payload: serde_json::Value,
67 pub created_at: String,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct GitHubActor {
72 pub login: String,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 pub avatar_url: Option<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct GitHubRepo {
79 pub name: String,
80}
81
82#[derive(Debug, Clone, Serialize)]
83#[serde(rename_all = "camelCase")]
84pub struct PollResult {
85 pub repo: String,
86 pub events_found: u32,
87 pub events_processed: u32,
88 pub events_skipped: u32,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub new_last_event_id: Option<String>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub error: Option<String>,
93}
94
95#[derive(Debug, Clone, Serialize)]
96#[serde(rename_all = "camelCase")]
97pub struct PollSummary {
98 pub repos_checked: u32,
99 pub total_events_found: u32,
100 pub total_events_processed: u32,
101 pub total_events_skipped: u32,
102}
103
104lazy_static::lazy_static! {
107 static ref POLLING_CONFIG: Mutex<PollingConfig> = Mutex::new(PollingConfig::default());
108 static ref POLLING_HANDLE: Mutex<Option<Arc<tokio::task::JoinHandle<()>>>> = Mutex::new(None);
109}
110
111pub fn start_polling_if_enabled() {
115 let config = POLLING_CONFIG.lock().unwrap().clone();
116 if config.enabled {
117 tracing::info!(
118 "[Polling] Auto-starting from env: interval={}s",
119 config.interval_seconds
120 );
121 start_polling_task();
122 }
123}
124
125fn start_polling_task() {
126 let mut handle_guard = POLLING_HANDLE.lock().unwrap();
127
128 if handle_guard.is_some() {
130 return;
131 }
132
133 let handle = tokio::spawn(async {
134 loop {
135 let interval = {
136 let config = POLLING_CONFIG.lock().unwrap();
137 if !config.enabled {
138 break;
139 }
140 config.interval_seconds
141 };
142
143 tokio::time::sleep(Duration::from_secs(interval as u64)).await;
144
145 if let Err(e) = poll_all_repos().await {
147 tracing::error!("[Polling] Error during poll: {}", e);
148 }
149 }
150 tracing::info!("[Polling] Background task stopped");
151 });
152
153 *handle_guard = Some(Arc::new(handle));
154}
155
156fn stop_polling_task() {
157 let mut handle_guard = POLLING_HANDLE.lock().unwrap();
158 if let Some(handle) = handle_guard.take() {
159 handle.abort();
160 tracing::info!("[Polling] Background task aborted");
161 }
162}
163
164async fn poll_all_repos() -> Result<Vec<PollResult>, String> {
165 let results: Vec<PollResult> = vec![];
173
174 let checked_at = chrono::Utc::now().to_rfc3339();
175 {
176 let mut config = POLLING_CONFIG.lock().unwrap();
177 config.last_checked_at = Some(checked_at);
178 }
179
180 Ok(results)
181}
182
183pub fn router() -> Router<AppState> {
186 Router::new()
187 .route("/config", get(get_config).post(update_config))
188 .route("/check", get(get_status).post(check_now))
189}
190
191async fn get_config() -> Result<Json<serde_json::Value>, ServerError> {
194 let config = POLLING_CONFIG.lock().unwrap().clone();
195
196 Ok(Json(serde_json::json!({
197 "ok": true,
198 "config": config,
199 })))
200}
201
202#[derive(Debug, Deserialize)]
203#[serde(rename_all = "camelCase")]
204struct UpdateConfigRequest {
205 enabled: Option<bool>,
206 interval_seconds: Option<u32>,
207}
208
209async fn update_config(
210 Json(body): Json<UpdateConfigRequest>,
211) -> Result<Json<serde_json::Value>, ServerError> {
212 let mut config = POLLING_CONFIG.lock().unwrap();
213
214 if let Some(enabled) = body.enabled {
215 config.enabled = enabled;
216 config.is_running = enabled;
217
218 drop(config);
220
221 if enabled {
222 start_polling_task();
223 send_notification(
224 "Routa Polling Enabled",
225 "GitHub event polling is now active",
226 );
227 } else {
228 stop_polling_task();
229 send_notification(
230 "Routa Polling Disabled",
231 "GitHub event polling has been stopped",
232 );
233 }
234
235 config = POLLING_CONFIG.lock().unwrap();
237 }
238
239 if let Some(interval) = body.interval_seconds {
240 if interval >= 10 {
241 config.interval_seconds = interval;
242
243 if config.enabled {
245 drop(config);
246 stop_polling_task();
247 start_polling_task();
248 config = POLLING_CONFIG.lock().unwrap();
249 }
250 }
251 }
252
253 let config_clone = config.clone();
254 drop(config);
255
256 Ok(Json(serde_json::json!({
257 "ok": true,
258 "config": config_clone,
259 })))
260}
261
262async fn get_status() -> Result<Json<serde_json::Value>, ServerError> {
263 let config = POLLING_CONFIG.lock().unwrap();
264
265 Ok(Json(serde_json::json!({
266 "ok": true,
267 "isRunning": config.is_running,
268 "lastCheckedAt": config.last_checked_at,
269 "intervalSeconds": config.interval_seconds,
270 "enabled": config.enabled,
271 })))
272}
273
274async fn check_now(State(_state): State<AppState>) -> Result<Json<serde_json::Value>, ServerError> {
275 let results = poll_all_repos()
276 .await
277 .map_err(|e| ServerError::Internal(format!("Polling failed: {}", e)))?;
278
279 let summary = PollSummary {
280 repos_checked: results.len() as u32,
281 total_events_found: results.iter().map(|r| r.events_found).sum(),
282 total_events_processed: results.iter().map(|r| r.events_processed).sum(),
283 total_events_skipped: results.iter().map(|r| r.events_skipped).sum(),
284 };
285
286 send_notification(
288 "Routa Poll Check",
289 &format!(
290 "Checked {} repos, {} events found",
291 summary.repos_checked, summary.total_events_found
292 ),
293 );
294
295 let checked_at = POLLING_CONFIG
296 .lock()
297 .unwrap()
298 .last_checked_at
299 .clone()
300 .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
301
302 tracing::info!(
303 "[Polling] Check completed: {} repos, {} events",
304 summary.repos_checked,
305 summary.total_events_found
306 );
307
308 Ok(Json(serde_json::json!({
309 "ok": true,
310 "checkedAt": checked_at,
311 "summary": summary,
312 "results": results,
313 })))
314}
315
316fn send_notification(title: &str, body: &str) {
324 #[cfg(target_os = "macos")]
325 {
326 use notify_rust::Notification;
327 let _ = Notification::new()
328 .summary(title)
329 .body(body)
330 .appname("Routa")
331 .sound_name("default")
332 .show();
333 }
334
335 #[cfg(target_os = "windows")]
336 {
337 use notify_rust::Notification;
338 let _ = Notification::new()
339 .summary(title)
340 .body(body)
341 .appname("Routa")
342 .show();
343 }
344
345 #[cfg(target_os = "linux")]
346 {
347 use notify_rust::Notification;
348 let _ = Notification::new()
349 .summary(title)
350 .body(body)
351 .appname("Routa")
352 .show();
353 }
354
355 tracing::debug!("[Notification] {}: {}", title, body);
357}