Skip to main content

deepseek_hooks/
lib.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use chrono::Utc;
7use deepseek_protocol::EventFrame;
8use serde::{Deserialize, Serialize};
9use serde_json::{Value, json};
10use tokio::io::AsyncWriteExt;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum HookEvent {
15    ResponseStart {
16        response_id: String,
17    },
18    ResponseDelta {
19        response_id: String,
20        delta: String,
21    },
22    ResponseEnd {
23        response_id: String,
24    },
25    ToolLifecycle {
26        response_id: String,
27        tool_name: String,
28        phase: String,
29        payload: Value,
30    },
31    JobLifecycle {
32        job_id: String,
33        phase: String,
34        progress: Option<u8>,
35        detail: Option<String>,
36    },
37    ApprovalLifecycle {
38        approval_id: String,
39        phase: String,
40        reason: Option<String>,
41    },
42    GenericEventFrame {
43        frame: EventFrame,
44    },
45}
46
47impl HookEvent {
48    pub fn to_json(&self) -> Value {
49        serde_json::to_value(self).unwrap_or_else(|_| json!({"type":"serialization_error"}))
50    }
51}
52
53#[async_trait]
54pub trait HookSink: Send + Sync {
55    async fn emit(&self, event: &HookEvent) -> Result<()>;
56}
57
58#[derive(Default)]
59pub struct StdoutHookSink;
60
61#[async_trait]
62impl HookSink for StdoutHookSink {
63    async fn emit(&self, event: &HookEvent) -> Result<()> {
64        println!("{}", event.to_json());
65        Ok(())
66    }
67}
68
69pub struct JsonlHookSink {
70    path: PathBuf,
71}
72
73impl JsonlHookSink {
74    pub fn new(path: PathBuf) -> Self {
75        Self { path }
76    }
77}
78
79#[async_trait]
80impl HookSink for JsonlHookSink {
81    async fn emit(&self, event: &HookEvent) -> Result<()> {
82        if let Some(parent) = self.path.parent() {
83            tokio::fs::create_dir_all(parent).await.with_context(|| {
84                format!("failed to create hook log directory {}", parent.display())
85            })?;
86        }
87        let mut file = tokio::fs::OpenOptions::new()
88            .create(true)
89            .append(true)
90            .open(&self.path)
91            .await
92            .with_context(|| format!("failed to open hook log {}", self.path.display()))?;
93        let payload = json!({
94            "at": Utc::now().to_rfc3339(),
95            "event": event
96        });
97        let encoded = serde_json::to_string(&payload).context("failed to encode hook event")?;
98        file.write_all(encoded.as_bytes())
99            .await
100            .context("failed to write hook event")?;
101        file.write_all(b"\n")
102            .await
103            .context("failed to write hook event newline")?;
104        Ok(())
105    }
106}
107
108pub struct WebhookHookSink {
109    url: String,
110    client: reqwest::Client,
111}
112
113impl WebhookHookSink {
114    pub fn new(url: String) -> Self {
115        Self {
116            url,
117            client: reqwest::Client::new(),
118        }
119    }
120}
121
122#[async_trait]
123impl HookSink for WebhookHookSink {
124    async fn emit(&self, event: &HookEvent) -> Result<()> {
125        let mut retries = 0usize;
126        loop {
127            let resp = self
128                .client
129                .post(&self.url)
130                .json(&json!({
131                    "at": Utc::now().to_rfc3339(),
132                    "event": event,
133                }))
134                .send()
135                .await;
136            match resp {
137                Ok(response) if response.status().is_success() => return Ok(()),
138                Ok(response) => {
139                    if retries >= 2 {
140                        anyhow::bail!("webhook returned non-success status {}", response.status());
141                    }
142                }
143                Err(err) => {
144                    if retries >= 2 {
145                        return Err(err).context("webhook request failed");
146                    }
147                }
148            }
149            retries += 1;
150            tokio::time::sleep(std::time::Duration::from_millis(200 * retries as u64)).await;
151        }
152    }
153}
154
155#[derive(Default, Clone)]
156pub struct HookDispatcher {
157    sinks: Vec<Arc<dyn HookSink>>,
158}
159
160impl HookDispatcher {
161    pub fn add_sink(&mut self, sink: Arc<dyn HookSink>) {
162        self.sinks.push(sink);
163    }
164
165    pub async fn emit(&self, event: HookEvent) {
166        for sink in &self.sinks {
167            let _ = sink.emit(&event).await;
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use std::sync::Mutex;
176    use std::time::{SystemTime, UNIX_EPOCH};
177
178    #[test]
179    fn hook_event_serializes_with_snake_case_type_and_payload() {
180        let event = HookEvent::ToolLifecycle {
181            response_id: "resp-1".to_string(),
182            tool_name: "shell".to_string(),
183            phase: "end".to_string(),
184            payload: json!({ "exit_code": 0 }),
185        };
186
187        let encoded = event.to_json();
188
189        assert_eq!(encoded["type"], "tool_lifecycle");
190        assert_eq!(encoded["response_id"], "resp-1");
191        assert_eq!(encoded["tool_name"], "shell");
192        assert_eq!(encoded["phase"], "end");
193        assert_eq!(encoded["payload"]["exit_code"], 0);
194    }
195
196    #[tokio::test]
197    async fn jsonl_sink_creates_parent_dir_and_appends_events() {
198        let root = unique_temp_dir("jsonl_sink");
199        let path = root.join("nested").join("hooks.jsonl");
200        let sink = JsonlHookSink::new(path.clone());
201
202        sink.emit(&HookEvent::ResponseStart {
203            response_id: "resp-1".to_string(),
204        })
205        .await
206        .unwrap();
207        sink.emit(&HookEvent::ResponseEnd {
208            response_id: "resp-1".to_string(),
209        })
210        .await
211        .unwrap();
212
213        let raw = std::fs::read_to_string(&path).unwrap();
214        let lines = raw.lines().collect::<Vec<_>>();
215        assert_eq!(lines.len(), 2);
216
217        let first: Value = serde_json::from_str(lines[0]).unwrap();
218        let second: Value = serde_json::from_str(lines[1]).unwrap();
219        assert!(first["at"].as_str().is_some());
220        assert_eq!(first["event"]["type"], "response_start");
221        assert_eq!(first["event"]["response_id"], "resp-1");
222        assert_eq!(second["event"]["type"], "response_end");
223        assert_eq!(second["event"]["response_id"], "resp-1");
224
225        let _ = std::fs::remove_dir_all(root);
226    }
227
228    #[tokio::test]
229    async fn dispatcher_continues_after_sink_error() {
230        let mut dispatcher = HookDispatcher::default();
231        let first = Arc::new(RecordingSink::default());
232        let second = Arc::new(RecordingSink::default());
233
234        dispatcher.add_sink(first.clone());
235        dispatcher.add_sink(Arc::new(FailingSink));
236        dispatcher.add_sink(second.clone());
237
238        dispatcher
239            .emit(HookEvent::ApprovalLifecycle {
240                approval_id: "approval-1".to_string(),
241                phase: "requested".to_string(),
242                reason: Some("needs review".to_string()),
243            })
244            .await;
245
246        assert_eq!(
247            first.events(),
248            vec![json!({
249                "type": "approval_lifecycle",
250                "approval_id": "approval-1",
251                "phase": "requested",
252                "reason": "needs review",
253            })]
254        );
255        assert_eq!(second.events(), first.events());
256    }
257
258    #[derive(Default)]
259    struct RecordingSink {
260        events: Mutex<Vec<Value>>,
261    }
262
263    impl RecordingSink {
264        fn events(&self) -> Vec<Value> {
265            self.events.lock().unwrap().clone()
266        }
267    }
268
269    #[async_trait::async_trait]
270    impl HookSink for RecordingSink {
271        async fn emit(&self, event: &HookEvent) -> Result<()> {
272            self.events.lock().unwrap().push(event.to_json());
273            Ok(())
274        }
275    }
276
277    struct FailingSink;
278
279    #[async_trait::async_trait]
280    impl HookSink for FailingSink {
281        async fn emit(&self, _event: &HookEvent) -> Result<()> {
282            anyhow::bail!("sink failed")
283        }
284    }
285
286    fn unique_temp_dir(label: &str) -> PathBuf {
287        let nanos = SystemTime::now()
288            .duration_since(UNIX_EPOCH)
289            .unwrap()
290            .as_nanos();
291        std::env::temp_dir().join(format!(
292            "deepseek-hooks-{label}-{}-{nanos}",
293            std::process::id()
294        ))
295    }
296}