1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use chrono::Utc;
7use deepseek_protocol::EventFrame;
8use serde::{Deserialize, Serialize};
9use serde_json::{Value, json};
10use tokio::io::AsyncWriteExt;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum HookEvent {
15 ResponseStart {
16 response_id: String,
17 },
18 ResponseDelta {
19 response_id: String,
20 delta: String,
21 },
22 ResponseEnd {
23 response_id: String,
24 },
25 ToolLifecycle {
26 response_id: String,
27 tool_name: String,
28 phase: String,
29 payload: Value,
30 },
31 JobLifecycle {
32 job_id: String,
33 phase: String,
34 progress: Option<u8>,
35 detail: Option<String>,
36 },
37 ApprovalLifecycle {
38 approval_id: String,
39 phase: String,
40 reason: Option<String>,
41 },
42 GenericEventFrame {
43 frame: EventFrame,
44 },
45}
46
47impl HookEvent {
48 pub fn to_json(&self) -> Value {
49 serde_json::to_value(self).unwrap_or_else(|_| json!({"type":"serialization_error"}))
50 }
51}
52
53#[async_trait]
54pub trait HookSink: Send + Sync {
55 async fn emit(&self, event: &HookEvent) -> Result<()>;
56}
57
58#[derive(Default)]
59pub struct StdoutHookSink;
60
61#[async_trait]
62impl HookSink for StdoutHookSink {
63 async fn emit(&self, event: &HookEvent) -> Result<()> {
64 println!("{}", event.to_json());
65 Ok(())
66 }
67}
68
69pub struct JsonlHookSink {
70 path: PathBuf,
71}
72
73impl JsonlHookSink {
74 pub fn new(path: PathBuf) -> Self {
75 Self { path }
76 }
77}
78
79#[async_trait]
80impl HookSink for JsonlHookSink {
81 async fn emit(&self, event: &HookEvent) -> Result<()> {
82 if let Some(parent) = self.path.parent() {
83 tokio::fs::create_dir_all(parent).await.with_context(|| {
84 format!("failed to create hook log directory {}", parent.display())
85 })?;
86 }
87 let mut file = tokio::fs::OpenOptions::new()
88 .create(true)
89 .append(true)
90 .open(&self.path)
91 .await
92 .with_context(|| format!("failed to open hook log {}", self.path.display()))?;
93 let payload = json!({
94 "at": Utc::now().to_rfc3339(),
95 "event": event
96 });
97 let encoded = serde_json::to_string(&payload).context("failed to encode hook event")?;
98 file.write_all(encoded.as_bytes())
99 .await
100 .context("failed to write hook event")?;
101 file.write_all(b"\n")
102 .await
103 .context("failed to write hook event newline")?;
104 Ok(())
105 }
106}
107
108pub struct WebhookHookSink {
109 url: String,
110 client: reqwest::Client,
111}
112
113impl WebhookHookSink {
114 pub fn new(url: String) -> Self {
115 Self {
116 url,
117 client: reqwest::Client::new(),
118 }
119 }
120}
121
122#[async_trait]
123impl HookSink for WebhookHookSink {
124 async fn emit(&self, event: &HookEvent) -> Result<()> {
125 let mut retries = 0usize;
126 loop {
127 let resp = self
128 .client
129 .post(&self.url)
130 .json(&json!({
131 "at": Utc::now().to_rfc3339(),
132 "event": event,
133 }))
134 .send()
135 .await;
136 match resp {
137 Ok(response) if response.status().is_success() => return Ok(()),
138 Ok(response) => {
139 if retries >= 2 {
140 anyhow::bail!("webhook returned non-success status {}", response.status());
141 }
142 }
143 Err(err) => {
144 if retries >= 2 {
145 return Err(err).context("webhook request failed");
146 }
147 }
148 }
149 retries += 1;
150 tokio::time::sleep(std::time::Duration::from_millis(200 * retries as u64)).await;
151 }
152 }
153}
154
155#[derive(Default, Clone)]
156pub struct HookDispatcher {
157 sinks: Vec<Arc<dyn HookSink>>,
158}
159
160impl HookDispatcher {
161 pub fn add_sink(&mut self, sink: Arc<dyn HookSink>) {
162 self.sinks.push(sink);
163 }
164
165 pub async fn emit(&self, event: HookEvent) {
166 for sink in &self.sinks {
167 let _ = sink.emit(&event).await;
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use std::sync::Mutex;
176 use std::time::{SystemTime, UNIX_EPOCH};
177
178 #[test]
179 fn hook_event_serializes_with_snake_case_type_and_payload() {
180 let event = HookEvent::ToolLifecycle {
181 response_id: "resp-1".to_string(),
182 tool_name: "shell".to_string(),
183 phase: "end".to_string(),
184 payload: json!({ "exit_code": 0 }),
185 };
186
187 let encoded = event.to_json();
188
189 assert_eq!(encoded["type"], "tool_lifecycle");
190 assert_eq!(encoded["response_id"], "resp-1");
191 assert_eq!(encoded["tool_name"], "shell");
192 assert_eq!(encoded["phase"], "end");
193 assert_eq!(encoded["payload"]["exit_code"], 0);
194 }
195
196 #[tokio::test]
197 async fn jsonl_sink_creates_parent_dir_and_appends_events() {
198 let root = unique_temp_dir("jsonl_sink");
199 let path = root.join("nested").join("hooks.jsonl");
200 let sink = JsonlHookSink::new(path.clone());
201
202 sink.emit(&HookEvent::ResponseStart {
203 response_id: "resp-1".to_string(),
204 })
205 .await
206 .unwrap();
207 sink.emit(&HookEvent::ResponseEnd {
208 response_id: "resp-1".to_string(),
209 })
210 .await
211 .unwrap();
212
213 let raw = std::fs::read_to_string(&path).unwrap();
214 let lines = raw.lines().collect::<Vec<_>>();
215 assert_eq!(lines.len(), 2);
216
217 let first: Value = serde_json::from_str(lines[0]).unwrap();
218 let second: Value = serde_json::from_str(lines[1]).unwrap();
219 assert!(first["at"].as_str().is_some());
220 assert_eq!(first["event"]["type"], "response_start");
221 assert_eq!(first["event"]["response_id"], "resp-1");
222 assert_eq!(second["event"]["type"], "response_end");
223 assert_eq!(second["event"]["response_id"], "resp-1");
224
225 let _ = std::fs::remove_dir_all(root);
226 }
227
228 #[tokio::test]
229 async fn dispatcher_continues_after_sink_error() {
230 let mut dispatcher = HookDispatcher::default();
231 let first = Arc::new(RecordingSink::default());
232 let second = Arc::new(RecordingSink::default());
233
234 dispatcher.add_sink(first.clone());
235 dispatcher.add_sink(Arc::new(FailingSink));
236 dispatcher.add_sink(second.clone());
237
238 dispatcher
239 .emit(HookEvent::ApprovalLifecycle {
240 approval_id: "approval-1".to_string(),
241 phase: "requested".to_string(),
242 reason: Some("needs review".to_string()),
243 })
244 .await;
245
246 assert_eq!(
247 first.events(),
248 vec![json!({
249 "type": "approval_lifecycle",
250 "approval_id": "approval-1",
251 "phase": "requested",
252 "reason": "needs review",
253 })]
254 );
255 assert_eq!(second.events(), first.events());
256 }
257
258 #[derive(Default)]
259 struct RecordingSink {
260 events: Mutex<Vec<Value>>,
261 }
262
263 impl RecordingSink {
264 fn events(&self) -> Vec<Value> {
265 self.events.lock().unwrap().clone()
266 }
267 }
268
269 #[async_trait::async_trait]
270 impl HookSink for RecordingSink {
271 async fn emit(&self, event: &HookEvent) -> Result<()> {
272 self.events.lock().unwrap().push(event.to_json());
273 Ok(())
274 }
275 }
276
277 struct FailingSink;
278
279 #[async_trait::async_trait]
280 impl HookSink for FailingSink {
281 async fn emit(&self, _event: &HookEvent) -> Result<()> {
282 anyhow::bail!("sink failed")
283 }
284 }
285
286 fn unique_temp_dir(label: &str) -> PathBuf {
287 let nanos = SystemTime::now()
288 .duration_since(UNIX_EPOCH)
289 .unwrap()
290 .as_nanos();
291 std::env::temp_dir().join(format!(
292 "deepseek-hooks-{label}-{}-{nanos}",
293 std::process::id()
294 ))
295 }
296}