1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use chrono::Utc;
7use deepseek_protocol::EventFrame;
8use serde::{Deserialize, Serialize};
9use serde_json::{Value, json};
10use tokio::io::AsyncWriteExt;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum HookEvent {
15 ResponseStart {
16 response_id: String,
17 },
18 ResponseDelta {
19 response_id: String,
20 delta: String,
21 },
22 ResponseEnd {
23 response_id: String,
24 },
25 ToolLifecycle {
26 response_id: String,
27 tool_name: String,
28 phase: String,
29 payload: Value,
30 },
31 JobLifecycle {
32 job_id: String,
33 phase: String,
34 progress: Option<u8>,
35 detail: Option<String>,
36 },
37 ApprovalLifecycle {
38 approval_id: String,
39 phase: String,
40 reason: Option<String>,
41 },
42 GenericEventFrame {
43 frame: EventFrame,
44 },
45}
46
47impl HookEvent {
48 pub fn to_json(&self) -> Value {
49 serde_json::to_value(self).unwrap_or_else(|_| json!({"type":"serialization_error"}))
50 }
51}
52
53#[async_trait]
54pub trait HookSink: Send + Sync {
55 async fn emit(&self, event: &HookEvent) -> Result<()>;
56}
57
58#[derive(Default)]
59pub struct StdoutHookSink;
60
61#[async_trait]
62impl HookSink for StdoutHookSink {
63 async fn emit(&self, event: &HookEvent) -> Result<()> {
64 println!("{}", event.to_json());
65 Ok(())
66 }
67}
68
69pub struct JsonlHookSink {
70 path: PathBuf,
71}
72
73impl JsonlHookSink {
74 pub fn new(path: PathBuf) -> Self {
75 Self { path }
76 }
77}
78
79#[async_trait]
80impl HookSink for JsonlHookSink {
81 async fn emit(&self, event: &HookEvent) -> Result<()> {
82 if let Some(parent) = self.path.parent() {
83 tokio::fs::create_dir_all(parent).await.with_context(|| {
84 format!("failed to create hook log directory {}", parent.display())
85 })?;
86 }
87 let mut file = tokio::fs::OpenOptions::new()
88 .create(true)
89 .append(true)
90 .open(&self.path)
91 .await
92 .with_context(|| format!("failed to open hook log {}", self.path.display()))?;
93 let payload = json!({
94 "at": Utc::now().to_rfc3339(),
95 "event": event
96 });
97 let encoded = serde_json::to_string(&payload).context("failed to encode hook event")?;
98 file.write_all(encoded.as_bytes())
99 .await
100 .context("failed to write hook event")?;
101 file.write_all(b"\n")
102 .await
103 .context("failed to write hook event newline")?;
104 Ok(())
105 }
106}
107
108pub struct WebhookHookSink {
109 url: String,
110 client: reqwest::Client,
111}
112
113impl WebhookHookSink {
114 pub fn new(url: String) -> Self {
115 Self {
116 url,
117 client: reqwest::Client::new(),
118 }
119 }
120}
121
122#[async_trait]
123impl HookSink for WebhookHookSink {
124 async fn emit(&self, event: &HookEvent) -> Result<()> {
125 let mut retries = 0usize;
126 loop {
127 let resp = self
128 .client
129 .post(&self.url)
130 .json(&json!({
131 "at": Utc::now().to_rfc3339(),
132 "event": event,
133 }))
134 .send()
135 .await;
136 match resp {
137 Ok(response) if response.status().is_success() => return Ok(()),
138 Ok(response) => {
139 if retries >= 2 {
140 anyhow::bail!("webhook returned non-success status {}", response.status());
141 }
142 }
143 Err(err) => {
144 if retries >= 2 {
145 return Err(err).context("webhook request failed");
146 }
147 }
148 }
149 retries += 1;
150 tokio::time::sleep(std::time::Duration::from_millis(200 * retries as u64)).await;
151 }
152 }
153}
154
155#[derive(Default, Clone)]
156pub struct HookDispatcher {
157 sinks: Vec<Arc<dyn HookSink>>,
158}
159
160impl HookDispatcher {
161 pub fn add_sink(&mut self, sink: Arc<dyn HookSink>) {
162 self.sinks.push(sink);
163 }
164
165 pub async fn emit(&self, event: HookEvent) {
166 for sink in &self.sinks {
167 let _ = sink.emit(&event).await;
168 }
169 }
170}