chronicle/daemon/
server.rs1use anyhow::Result;
2use std::path::Path;
3use std::sync::Arc;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5use tokio::net::UnixListener;
6use tokio::sync::{broadcast, Mutex};
7
8use crate::daemon::processor::EventProcessor;
9use crate::db::models::Event;
10use crate::db::models::HookPayload;
11
12pub async fn run(chronicle_dir: &Path, conn: Arc<Mutex<rusqlite::Connection>>) -> Result<()> {
13 let sock_path = chronicle_dir.join("chronicle.sock");
14 if sock_path.exists() {
15 std::fs::remove_file(&sock_path)?;
16 }
17
18 let live_sock_path = chronicle_dir.join("chronicle-live.sock");
19 if live_sock_path.exists() {
20 std::fs::remove_file(&live_sock_path)?;
21 }
22
23 let listener = UnixListener::bind(&sock_path)?;
24 let live_listener = UnixListener::bind(&live_sock_path)?;
25 let (broadcast_tx, _) = broadcast::channel::<Event>(1024);
26 let mut processor = EventProcessor::new(conn, broadcast_tx.clone());
27
28 let pid_path = chronicle_dir.join("daemon.pid");
29 std::fs::write(&pid_path, std::process::id().to_string())?;
30
31 let idle_timeout = std::time::Duration::from_secs(30 * 60);
32 let mut last_activity = std::time::Instant::now();
33 let mut evict_interval = tokio::time::interval(std::time::Duration::from_secs(60));
34
35 let mut sigterm =
36 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
37
38 tracing::info!("Chronicle daemon listening on {}", sock_path.display());
39
40 loop {
41 tokio::select! {
42 _ = sigterm.recv() => {
43 tracing::info!("SIGTERM received, shutting down");
44 break;
45 }
46 _ = evict_interval.tick() => {
47 processor.evict_stale_entries();
48 if last_activity.elapsed() >= idle_timeout {
49 tracing::info!("Idle timeout reached, shutting down");
50 break;
51 }
52 }
53 accept_result = live_listener.accept() => {
54 match accept_result {
55 Ok((stream, _addr)) => {
56 let mut rx = broadcast_tx.subscribe();
57 tokio::spawn(async move {
58 let (_, mut writer) = tokio::io::split(stream);
59 loop {
60 match rx.recv().await {
61 Ok(event) => {
62 let mut data = match serde_json::to_vec(&event) {
63 Ok(d) => d,
64 Err(_) => continue,
65 };
66 data.push(b'\n');
67 if writer.write_all(&data).await.is_err() {
68 break; }
70 }
71 Err(broadcast::error::RecvError::Lagged(n)) => {
72 tracing::warn!("Live subscriber lagged by {n} events");
73 }
74 Err(broadcast::error::RecvError::Closed) => break,
75 }
76 }
77 });
78 }
79 Err(e) => {
80 tracing::error!("Live accept error: {e}");
81 }
82 }
83 }
84 accept_result = listener.accept() => {
85 match accept_result {
86 Ok((mut stream, _addr)) => {
87 last_activity = std::time::Instant::now();
88 let mut buf = Vec::new();
89 if let Err(e) = stream.read_to_end(&mut buf).await {
90 tracing::warn!("Failed to read from socket: {e}");
91 continue;
92 }
93 match serde_json::from_slice::<HookPayload>(&buf) {
94 Ok(payload) => {
95 if let Err(e) = processor.process(payload).await {
96 tracing::error!("Failed to process event: {e}");
97 }
98 }
99 Err(e) => {
100 tracing::warn!("Invalid JSON from hook: {e}");
101 }
102 }
103 }
104 Err(e) => {
105 tracing::error!("Accept error: {e}");
106 }
107 }
108 }
109 }
110 }
111
112 let _ = std::fs::remove_file(&sock_path);
113 let _ = std::fs::remove_file(&live_sock_path);
114 let _ = std::fs::remove_file(&chronicle_dir.join("daemon.pid"));
115 processor.clear_pending();
116
117 Ok(())
118}