Skip to main content

chronicle/daemon/
server.rs

1use anyhow::Result;
2use std::path::Path;
3use std::sync::Arc;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5use tokio::net::UnixListener;
6use tokio::sync::{broadcast, Mutex};
7
8use crate::daemon::processor::EventProcessor;
9use crate::db::models::Event;
10use crate::db::models::HookPayload;
11
12pub async fn run(chronicle_dir: &Path, conn: Arc<Mutex<rusqlite::Connection>>) -> Result<()> {
13    let sock_path = chronicle_dir.join("chronicle.sock");
14    if sock_path.exists() {
15        std::fs::remove_file(&sock_path)?;
16    }
17
18    let live_sock_path = chronicle_dir.join("chronicle-live.sock");
19    if live_sock_path.exists() {
20        std::fs::remove_file(&live_sock_path)?;
21    }
22
23    let listener = UnixListener::bind(&sock_path)?;
24    let live_listener = UnixListener::bind(&live_sock_path)?;
25    let (broadcast_tx, _) = broadcast::channel::<Event>(1024);
26    let mut processor = EventProcessor::new(conn, broadcast_tx.clone());
27
28    let pid_path = chronicle_dir.join("daemon.pid");
29    std::fs::write(&pid_path, std::process::id().to_string())?;
30
31    let idle_timeout = std::time::Duration::from_secs(30 * 60);
32    let mut last_activity = std::time::Instant::now();
33    let mut evict_interval = tokio::time::interval(std::time::Duration::from_secs(60));
34
35    let mut sigterm =
36        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
37
38    tracing::info!("Chronicle daemon listening on {}", sock_path.display());
39
40    loop {
41        tokio::select! {
42            _ = sigterm.recv() => {
43                tracing::info!("SIGTERM received, shutting down");
44                break;
45            }
46            _ = evict_interval.tick() => {
47                processor.evict_stale_entries();
48                if last_activity.elapsed() >= idle_timeout {
49                    tracing::info!("Idle timeout reached, shutting down");
50                    break;
51                }
52            }
53            accept_result = live_listener.accept() => {
54                match accept_result {
55                    Ok((stream, _addr)) => {
56                        let mut rx = broadcast_tx.subscribe();
57                        tokio::spawn(async move {
58                            let (_, mut writer) = tokio::io::split(stream);
59                            loop {
60                                match rx.recv().await {
61                                    Ok(event) => {
62                                        let mut data = match serde_json::to_vec(&event) {
63                                            Ok(d) => d,
64                                            Err(_) => continue,
65                                        };
66                                        data.push(b'\n');
67                                        if writer.write_all(&data).await.is_err() {
68                                            break; // client disconnected
69                                        }
70                                    }
71                                    Err(broadcast::error::RecvError::Lagged(n)) => {
72                                        tracing::warn!("Live subscriber lagged by {n} events");
73                                    }
74                                    Err(broadcast::error::RecvError::Closed) => break,
75                                }
76                            }
77                        });
78                    }
79                    Err(e) => {
80                        tracing::error!("Live accept error: {e}");
81                    }
82                }
83            }
84            accept_result = listener.accept() => {
85                match accept_result {
86                    Ok((mut stream, _addr)) => {
87                        last_activity = std::time::Instant::now();
88                        let mut buf = Vec::new();
89                        if let Err(e) = stream.read_to_end(&mut buf).await {
90                            tracing::warn!("Failed to read from socket: {e}");
91                            continue;
92                        }
93                        match serde_json::from_slice::<HookPayload>(&buf) {
94                            Ok(payload) => {
95                                if let Err(e) = processor.process(payload).await {
96                                    tracing::error!("Failed to process event: {e}");
97                                }
98                            }
99                            Err(e) => {
100                                tracing::warn!("Invalid JSON from hook: {e}");
101                            }
102                        }
103                    }
104                    Err(e) => {
105                        tracing::error!("Accept error: {e}");
106                    }
107                }
108            }
109        }
110    }
111
112    let _ = std::fs::remove_file(&sock_path);
113    let _ = std::fs::remove_file(&live_sock_path);
114    let _ = std::fs::remove_file(&chronicle_dir.join("daemon.pid"));
115    processor.clear_pending();
116
117    Ok(())
118}