1use std::sync::{Arc, Mutex};
7
8use crate::{
9 handler::{
10 ErrorKind, Handler, StatusKind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent,
11 ToolUseBlockKind, UsageKind,
12 },
13 hook::ToolCall,
14 timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
15};
16
17pub trait WorkerSubscriber: Send {
60 type TextBlockScope: Default + Send + Sync;
69
70 type ToolUseBlockScope: Default + Send + Sync;
72
73 #[allow(unused_variables)]
82 fn on_text_block(&mut self, scope: &mut Self::TextBlockScope, event: &TextBlockEvent) {}
83
84 #[allow(unused_variables)]
88 fn on_tool_use_block(
89 &mut self,
90 scope: &mut Self::ToolUseBlockScope,
91 event: &ToolUseBlockEvent,
92 ) {
93 }
94
95 #[allow(unused_variables)]
101 fn on_usage(&mut self, event: &UsageEvent) {}
102
103 #[allow(unused_variables)]
105 fn on_status(&mut self, event: &StatusEvent) {}
106
107 #[allow(unused_variables)]
109 fn on_error(&mut self, event: &ErrorEvent) {}
110
111 #[allow(unused_variables)]
120 fn on_text_complete(&mut self, text: &str) {}
121
122 #[allow(unused_variables)]
126 fn on_tool_call_complete(&mut self, call: &ToolCall) {}
127
128 #[allow(unused_variables)]
136 fn on_turn_start(&mut self, turn: usize) {}
137
138 #[allow(unused_variables)]
140 fn on_turn_end(&mut self, turn: usize) {}
141}
142
143pub(crate) struct TextBlockSubscriberAdapter<S: WorkerSubscriber> {
153 subscriber: Arc<Mutex<S>>,
154}
155
156impl<S: WorkerSubscriber> TextBlockSubscriberAdapter<S> {
157 pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
158 Self { subscriber }
159 }
160}
161
162impl<S: WorkerSubscriber> Clone for TextBlockSubscriberAdapter<S> {
163 fn clone(&self) -> Self {
164 Self {
165 subscriber: self.subscriber.clone(),
166 }
167 }
168}
169
170pub struct TextBlockScopeWrapper<S: WorkerSubscriber> {
172 inner: S::TextBlockScope,
173 buffer: String, }
175
176impl<S: WorkerSubscriber> Default for TextBlockScopeWrapper<S> {
177 fn default() -> Self {
178 Self {
179 inner: S::TextBlockScope::default(),
180 buffer: String::new(),
181 }
182 }
183}
184
185impl<S: WorkerSubscriber + 'static> Handler<TextBlockKind> for TextBlockSubscriberAdapter<S> {
186 type Scope = TextBlockScopeWrapper<S>;
187
188 fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) {
189 if let TextBlockEvent::Delta(text) = event {
191 scope.buffer.push_str(text);
192 }
193
194 if let Ok(mut subscriber) = self.subscriber.lock() {
196 subscriber.on_text_block(&mut scope.inner, event);
197
198 if matches!(event, TextBlockEvent::Stop(_)) {
200 subscriber.on_text_complete(&scope.buffer);
201 }
202 }
203 }
204}
205
206pub(crate) struct ToolUseBlockSubscriberAdapter<S: WorkerSubscriber> {
212 subscriber: Arc<Mutex<S>>,
213}
214
215impl<S: WorkerSubscriber> ToolUseBlockSubscriberAdapter<S> {
216 pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
217 Self { subscriber }
218 }
219}
220
221impl<S: WorkerSubscriber> Clone for ToolUseBlockSubscriberAdapter<S> {
222 fn clone(&self) -> Self {
223 Self {
224 subscriber: self.subscriber.clone(),
225 }
226 }
227}
228
229pub struct ToolUseBlockScopeWrapper<S: WorkerSubscriber> {
231 inner: S::ToolUseBlockScope,
232 id: String,
233 name: String,
234 input_json: String, }
236
237impl<S: WorkerSubscriber> Default for ToolUseBlockScopeWrapper<S> {
238 fn default() -> Self {
239 Self {
240 inner: S::ToolUseBlockScope::default(),
241 id: String::new(),
242 name: String::new(),
243 input_json: String::new(),
244 }
245 }
246}
247
248impl<S: WorkerSubscriber + 'static> Handler<ToolUseBlockKind> for ToolUseBlockSubscriberAdapter<S> {
249 type Scope = ToolUseBlockScopeWrapper<S>;
250
251 fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) {
252 if let ToolUseBlockEvent::Start(start) = event {
254 scope.id = start.id.clone();
255 scope.name = start.name.clone();
256 }
257
258 if let ToolUseBlockEvent::InputJsonDelta(json) = event {
260 scope.input_json.push_str(json);
261 }
262
263 if let Ok(mut subscriber) = self.subscriber.lock() {
265 subscriber.on_tool_use_block(&mut scope.inner, event);
266
267 if matches!(event, ToolUseBlockEvent::Stop(_)) {
269 let input: serde_json::Value =
270 serde_json::from_str(&scope.input_json).unwrap_or_default();
271 let tool_call = ToolCall {
272 id: scope.id.clone(),
273 name: scope.name.clone(),
274 input,
275 };
276 subscriber.on_tool_call_complete(&tool_call);
277 }
278 }
279 }
280}
281
282pub(crate) struct UsageSubscriberAdapter<S: WorkerSubscriber> {
288 subscriber: Arc<Mutex<S>>,
289}
290
291impl<S: WorkerSubscriber> UsageSubscriberAdapter<S> {
292 pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
293 Self { subscriber }
294 }
295}
296
297impl<S: WorkerSubscriber> Clone for UsageSubscriberAdapter<S> {
298 fn clone(&self) -> Self {
299 Self {
300 subscriber: self.subscriber.clone(),
301 }
302 }
303}
304
305impl<S: WorkerSubscriber + 'static> Handler<UsageKind> for UsageSubscriberAdapter<S> {
306 type Scope = ();
307
308 fn on_event(&mut self, _scope: &mut Self::Scope, event: &UsageEvent) {
309 if let Ok(mut subscriber) = self.subscriber.lock() {
310 subscriber.on_usage(event);
311 }
312 }
313}
314
315pub(crate) struct StatusSubscriberAdapter<S: WorkerSubscriber> {
317 subscriber: Arc<Mutex<S>>,
318}
319
320impl<S: WorkerSubscriber> StatusSubscriberAdapter<S> {
321 pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
322 Self { subscriber }
323 }
324}
325
326impl<S: WorkerSubscriber> Clone for StatusSubscriberAdapter<S> {
327 fn clone(&self) -> Self {
328 Self {
329 subscriber: self.subscriber.clone(),
330 }
331 }
332}
333
334impl<S: WorkerSubscriber + 'static> Handler<StatusKind> for StatusSubscriberAdapter<S> {
335 type Scope = ();
336
337 fn on_event(&mut self, _scope: &mut Self::Scope, event: &StatusEvent) {
338 if let Ok(mut subscriber) = self.subscriber.lock() {
339 subscriber.on_status(event);
340 }
341 }
342}
343
344pub(crate) struct ErrorSubscriberAdapter<S: WorkerSubscriber> {
346 subscriber: Arc<Mutex<S>>,
347}
348
349impl<S: WorkerSubscriber> ErrorSubscriberAdapter<S> {
350 pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
351 Self { subscriber }
352 }
353}
354
355impl<S: WorkerSubscriber> Clone for ErrorSubscriberAdapter<S> {
356 fn clone(&self) -> Self {
357 Self {
358 subscriber: self.subscriber.clone(),
359 }
360 }
361}
362
363impl<S: WorkerSubscriber + 'static> Handler<ErrorKind> for ErrorSubscriberAdapter<S> {
364 type Scope = ();
365
366 fn on_event(&mut self, _scope: &mut Self::Scope, event: &ErrorEvent) {
367 if let Ok(mut subscriber) = self.subscriber.lock() {
368 subscriber.on_error(event);
369 }
370 }
371}