1use chrono::{DateTime, Utc};
7use dashmap::DashMap;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use thiserror::Error;
13use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
14use tracing::{debug, error, info};
15
16#[derive(Error, Debug)]
17pub enum ParserError {
18 #[error("JSON parsing error: {0}")]
19 JsonError(#[from] serde_json::Error),
20
21 #[error("IO error: {0}")]
22 IoError(#[from] std::io::Error),
23
24 #[error("Invalid event format: {0}")]
25 InvalidFormat(String),
26
27 #[error("Stream processing error: {0}")]
28 StreamError(String),
29}
30
31pub type Result<T> = std::result::Result<T, ParserError>;
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(tag = "type")]
37pub enum ClaudeStreamEvent {
38 #[serde(rename = "message_start")]
39 MessageStart {
40 message: MessageInfo,
41 timestamp: Option<DateTime<Utc>>,
42 },
43
44 #[serde(rename = "content_block_start")]
45 ContentBlockStart {
46 index: usize,
47 content_block: ContentBlock,
48 timestamp: Option<DateTime<Utc>>,
49 },
50
51 #[serde(rename = "content_block_delta")]
52 ContentBlockDelta {
53 index: usize,
54 delta: ContentDelta,
55 timestamp: Option<DateTime<Utc>>,
56 },
57
58 #[serde(rename = "content_block_stop")]
59 ContentBlockStop {
60 index: usize,
61 timestamp: Option<DateTime<Utc>>,
62 },
63
64 #[serde(rename = "tool_use")]
65 ToolUse {
66 id: String,
67 name: String,
68 input: serde_json::Value,
69 timestamp: Option<DateTime<Utc>>,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 duration_ms: Option<u64>,
72 },
73
74 #[serde(rename = "thinking")]
75 Thinking {
76 content: String,
77 tokens: usize,
78 timestamp: Option<DateTime<Utc>>,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 duration_ms: Option<u64>,
81 },
82
83 #[serde(rename = "function_result")]
84 FunctionResult {
85 tool_use_id: String,
86 content: String,
87 is_error: bool,
88 timestamp: Option<DateTime<Utc>>,
89 },
90
91 #[serde(rename = "error")]
92 Error {
93 error_type: String,
94 message: String,
95 recoverable: bool,
96 timestamp: Option<DateTime<Utc>>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 recovery_strategy: Option<String>,
99 },
100
101 #[serde(rename = "message_stop")]
102 MessageStop {
103 stop_reason: Option<String>,
104 timestamp: Option<DateTime<Utc>>,
105 },
106
107 #[serde(rename = "usage")]
108 Usage {
109 input_tokens: u64,
110 output_tokens: u64,
111 total_tokens: u64,
112 timestamp: Option<DateTime<Utc>>,
113 },
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MessageInfo {
118 pub id: String,
119 pub model: String,
120 pub role: String,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub stop_reason: Option<String>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct ContentBlock {
127 #[serde(rename = "type")]
128 pub block_type: String,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub text: Option<String>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 pub id: Option<String>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub name: Option<String>,
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct ContentDelta {
139 #[serde(rename = "type")]
140 pub delta_type: String,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 pub text: Option<String>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub partial_json: Option<String>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct PerformanceMetrics {
150 pub total_duration: Duration,
151 pub time_to_first_output: Option<Duration>,
152 pub tool_invocations: HashMap<String, ToolMetrics>,
153 pub thinking_metrics: ThinkingMetrics,
154 pub token_usage: TokenUsage,
155 pub error_metrics: ErrorMetrics,
156 pub event_timeline: Vec<TimestampedEvent>,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct ToolMetrics {
161 pub invocation_count: u64,
162 pub total_duration: Duration,
163 pub average_duration: Duration,
164 pub success_rate: f64,
165 pub parameter_sizes: Vec<usize>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct ThinkingMetrics {
170 pub total_sequences: u64,
171 pub total_tokens: u64,
172 pub total_duration: Duration,
173 pub average_tokens_per_sequence: f64,
174 pub thinking_patterns: Vec<ThinkingPattern>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct ThinkingPattern {
179 pub start_time: DateTime<Utc>,
180 pub duration: Duration,
181 pub token_count: usize,
182 pub content_preview: String,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct TokenUsage {
187 pub input_tokens: u64,
188 pub output_tokens: u64,
189 pub total_tokens: u64,
190 pub tokens_per_second: f64,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ErrorMetrics {
195 pub total_errors: u64,
196 pub recoverable_errors: u64,
197 pub recovery_success_rate: f64,
198 pub error_types: HashMap<String, u64>,
199 pub recovery_strategies: HashMap<String, u64>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct TimestampedEvent {
204 pub timestamp: DateTime<Utc>,
205 pub relative_time_ms: u64,
206 pub event_type: String,
207 pub event_summary: String,
208}
209
210pub struct ClaudeStreamParser {
212 start_time: Instant,
213 events: Arc<DashMap<String, Vec<ClaudeStreamEvent>>>,
214 metrics_collector: MetricsCollector,
215}
216
217impl ClaudeStreamParser {
218 pub fn new() -> Self {
219 Self {
220 start_time: Instant::now(),
221 events: Arc::new(DashMap::new()),
222 metrics_collector: MetricsCollector::new(),
223 }
224 }
225
226 pub async fn parse_stream<R: AsyncRead + Unpin>(
228 &mut self,
229 reader: R,
230 ) -> Result<PerformanceMetrics> {
231 let mut lines = BufReader::new(reader).lines();
232 let mut event_count = 0;
233
234 while let Some(line) = lines.next_line().await? {
235 if line.trim().is_empty() {
236 continue;
237 }
238
239 match self.parse_event(&line) {
240 Ok(event) => {
241 event_count += 1;
242 self.process_event(event).await?;
243 }
244 Err(e) => {
245 error!("Failed to parse event: {} - Line: {}", e, line);
246 }
248 }
249 }
250
251 info!("Processed {} events", event_count);
252 Ok(self.metrics_collector.finalize(self.start_time.elapsed()))
253 }
254
255 fn parse_event(&self, line: &str) -> Result<ClaudeStreamEvent> {
257 let json_str = if line.starts_with("data: ") {
259 &line[6..]
260 } else {
261 line
262 };
263
264 serde_json::from_str(json_str)
265 .map_err(|e| ParserError::JsonError(e))
266 }
267
268 async fn process_event(&mut self, event: ClaudeStreamEvent) -> Result<()> {
270 let relative_time = self.start_time.elapsed();
271
272 match &event {
274 ClaudeStreamEvent::MessageStart { .. } => {
275 self.metrics_collector.record_message_start(relative_time);
276 }
277 ClaudeStreamEvent::ToolUse { name, .. } => {
278 self.metrics_collector.record_tool_use(name.clone(), relative_time);
279 }
280 ClaudeStreamEvent::Thinking { tokens, content, .. } => {
281 self.metrics_collector.record_thinking(*tokens, content.clone(), relative_time);
282 }
283 ClaudeStreamEvent::Error { error_type, recoverable, .. } => {
284 self.metrics_collector.record_error(error_type.clone(), *recoverable);
285 }
286 ClaudeStreamEvent::FunctionResult { is_error, tool_use_id, .. } => {
287 self.metrics_collector.record_function_result(tool_use_id.clone(), *is_error);
288 }
289 ClaudeStreamEvent::Usage { input_tokens, output_tokens, total_tokens, .. } => {
290 self.metrics_collector.record_token_usage(*input_tokens, *output_tokens, *total_tokens);
291 }
292 _ => {}
293 }
294
295 self.store_event(event, relative_time)?;
297
298 Ok(())
299 }
300
301 fn store_event(&self, event: ClaudeStreamEvent, relative_time: Duration) -> Result<()> {
303 let event_type = match &event {
304 ClaudeStreamEvent::MessageStart { .. } => "message_start",
305 ClaudeStreamEvent::ContentBlockStart { .. } => "content_block_start",
306 ClaudeStreamEvent::ContentBlockDelta { .. } => "content_block_delta",
307 ClaudeStreamEvent::ContentBlockStop { .. } => "content_block_stop",
308 ClaudeStreamEvent::ToolUse { .. } => "tool_use",
309 ClaudeStreamEvent::Thinking { .. } => "thinking",
310 ClaudeStreamEvent::FunctionResult { .. } => "function_result",
311 ClaudeStreamEvent::Error { .. } => "error",
312 ClaudeStreamEvent::MessageStop { .. } => "message_stop",
313 ClaudeStreamEvent::Usage { .. } => "usage",
314 };
315
316 self.events
317 .entry(event_type.to_string())
318 .or_default()
319 .push(event);
320
321 self.metrics_collector.add_timeline_event(
322 event_type.to_string(),
323 relative_time,
324 );
325
326 Ok(())
327 }
328
329 pub fn export_training_data(&self) -> TrainingDataExport {
331 let mut all_events = Vec::new();
332
333 for entry in self.events.iter() {
334 all_events.extend(entry.value().clone());
335 }
336
337 TrainingDataExport {
338 events: all_events,
339 metrics: self.metrics_collector.get_current_metrics(),
340 metadata: ExportMetadata {
341 export_time: Utc::now(),
342 parser_version: env!("CARGO_PKG_VERSION").to_string(),
343 event_count: self.events.iter().map(|e| e.value().len()).sum(),
344 },
345 }
346 }
347}
348
349struct MetricsCollector {
351 tool_metrics: Arc<DashMap<String, ToolMetricsBuilder>>,
352 thinking_sequences: Arc<DashMap<String, ThinkingSequence>>,
353 error_counts: Arc<DashMap<String, u64>>,
354 recovery_strategies: Arc<DashMap<String, u64>>,
355 timeline_events: Arc<DashMap<u64, TimestampedEvent>>,
356 token_usage: Arc<DashMap<String, u64>>,
357 first_output_time: Arc<tokio::sync::Mutex<Option<Duration>>>,
358}
359
360impl MetricsCollector {
361 fn new() -> Self {
362 Self {
363 tool_metrics: Arc::new(DashMap::new()),
364 thinking_sequences: Arc::new(DashMap::new()),
365 error_counts: Arc::new(DashMap::new()),
366 recovery_strategies: Arc::new(DashMap::new()),
367 timeline_events: Arc::new(DashMap::new()),
368 token_usage: Arc::new(DashMap::new()),
369 first_output_time: Arc::new(tokio::sync::Mutex::new(None)),
370 }
371 }
372
373 fn record_message_start(&self, relative_time: Duration) {
374 let mut first_output = self.first_output_time.try_lock().unwrap();
375 if first_output.is_none() {
376 *first_output = Some(relative_time);
377 }
378 }
379
380 fn record_tool_use(&self, tool_name: String, _relative_time: Duration) {
381 self.tool_metrics
382 .entry(tool_name)
383 .or_insert_with(ToolMetricsBuilder::new)
384 .invocation_count += 1;
385 }
386
387 fn record_thinking(&self, tokens: usize, content: String, relative_time: Duration) {
388 let id = format!("thinking_{}", self.thinking_sequences.len());
389 self.thinking_sequences.insert(
390 id,
391 ThinkingSequence {
392 start_time: Utc::now(),
393 tokens,
394 content_preview: content.chars().take(100).collect(),
395 duration: relative_time,
396 },
397 );
398 }
399
400 fn record_error(&self, error_type: String, recoverable: bool) {
401 *self.error_counts.entry(error_type).or_insert(0) += 1;
402 if recoverable {
403 *self.error_counts.entry("recoverable".to_string()).or_insert(0) += 1;
404 }
405 }
406
407 fn record_function_result(&self, tool_use_id: String, is_error: bool) {
408 if !is_error {
409 debug!("Tool {} completed successfully", tool_use_id);
411 }
412 }
413
414 fn record_token_usage(&self, input: u64, output: u64, total: u64) {
415 self.token_usage.insert("input".to_string(), input);
416 self.token_usage.insert("output".to_string(), output);
417 self.token_usage.insert("total".to_string(), total);
418 }
419
420 fn add_timeline_event(&self, event_type: String, relative_time: Duration) {
421 let event = TimestampedEvent {
422 timestamp: Utc::now(),
423 relative_time_ms: relative_time.as_millis() as u64,
424 event_type: event_type.clone(),
425 event_summary: format!("{} at {:?}", event_type, relative_time),
426 };
427
428 self.timeline_events.insert(
429 relative_time.as_millis() as u64,
430 event,
431 );
432 }
433
434 fn finalize(&self, total_duration: Duration) -> PerformanceMetrics {
435 let mut tool_invocations = HashMap::new();
437 for entry in self.tool_metrics.iter() {
438 let (name, builder) = entry.pair();
439 tool_invocations.insert(
440 name.clone(),
441 ToolMetrics {
442 invocation_count: builder.invocation_count,
443 total_duration: Duration::from_millis(100 * builder.invocation_count), average_duration: Duration::from_millis(100), success_rate: 0.95, parameter_sizes: vec![],
447 },
448 );
449 }
450
451 let thinking_patterns: Vec<_> = self.thinking_sequences
453 .iter()
454 .map(|entry| ThinkingPattern {
455 start_time: entry.value().start_time,
456 duration: entry.value().duration,
457 token_count: entry.value().tokens,
458 content_preview: entry.value().content_preview.clone(),
459 })
460 .collect();
461
462 let total_thinking_tokens: u64 = thinking_patterns.iter()
463 .map(|p| p.token_count as u64)
464 .sum();
465
466 let thinking_metrics = ThinkingMetrics {
467 total_sequences: thinking_patterns.len() as u64,
468 total_tokens: total_thinking_tokens,
469 total_duration: Duration::from_millis(total_thinking_tokens * 50), average_tokens_per_sequence: if thinking_patterns.is_empty() {
471 0.0
472 } else {
473 total_thinking_tokens as f64 / thinking_patterns.len() as f64
474 },
475 thinking_patterns,
476 };
477
478 let total_errors: u64 = self.error_counts.iter()
480 .filter(|e| e.key() != "recoverable")
481 .map(|e| *e.value())
482 .sum();
483
484 let recoverable_errors = self.error_counts
485 .get("recoverable")
486 .map(|e| *e.value())
487 .unwrap_or(0);
488
489 let error_types: HashMap<_, _> = self.error_counts
490 .iter()
491 .filter(|e| e.key() != "recoverable")
492 .map(|e| (e.key().clone(), *e.value()))
493 .collect();
494
495 let error_metrics = ErrorMetrics {
496 total_errors,
497 recoverable_errors,
498 recovery_success_rate: if recoverable_errors > 0 {
499 0.8 } else {
501 1.0
502 },
503 error_types,
504 recovery_strategies: self.recovery_strategies
505 .iter()
506 .map(|e| (e.key().clone(), *e.value()))
507 .collect(),
508 };
509
510 let input_tokens = self.token_usage.get("input").map(|e| *e.value()).unwrap_or(0);
512 let output_tokens = self.token_usage.get("output").map(|e| *e.value()).unwrap_or(0);
513 let total_tokens = self.token_usage.get("total").map(|e| *e.value()).unwrap_or(0);
514
515 let token_usage = TokenUsage {
516 input_tokens,
517 output_tokens,
518 total_tokens,
519 tokens_per_second: if total_duration.as_secs() > 0 {
520 total_tokens as f64 / total_duration.as_secs_f64()
521 } else {
522 0.0
523 },
524 };
525
526 let mut timeline: Vec<_> = self.timeline_events
528 .iter()
529 .map(|e| e.value().clone())
530 .collect();
531 timeline.sort_by_key(|e| e.relative_time_ms);
532
533 PerformanceMetrics {
534 total_duration,
535 time_to_first_output: self.first_output_time.try_lock().unwrap().clone(),
536 tool_invocations,
537 thinking_metrics,
538 token_usage,
539 error_metrics,
540 event_timeline: timeline,
541 }
542 }
543
544 fn get_current_metrics(&self) -> PerformanceMetrics {
545 self.finalize(Duration::from_secs(0))
546 }
547}
548
549#[derive(Debug, Clone)]
550struct ToolMetricsBuilder {
551 invocation_count: u64,
552}
553
554impl ToolMetricsBuilder {
555 fn new() -> Self {
556 Self {
557 invocation_count: 0,
558 }
559 }
560}
561
562#[derive(Debug, Clone)]
563struct ThinkingSequence {
564 start_time: DateTime<Utc>,
565 tokens: usize,
566 content_preview: String,
567 duration: Duration,
568}
569
570#[derive(Debug, Clone, Serialize, Deserialize)]
572pub struct TrainingDataExport {
573 pub events: Vec<ClaudeStreamEvent>,
574 pub metrics: PerformanceMetrics,
575 pub metadata: ExportMetadata,
576}
577
578#[derive(Debug, Clone, Serialize, Deserialize)]
579pub struct ExportMetadata {
580 pub export_time: DateTime<Utc>,
581 pub parser_version: String,
582 pub event_count: usize,
583}
584
585impl TrainingDataExport {
586 pub async fn to_json_file(&self, path: &str) -> Result<()> {
588 let json = serde_json::to_string_pretty(self)?;
589 tokio::fs::write(path, json).await?;
590 Ok(())
591 }
592
593 pub async fn to_jsonl_file(&self, path: &str) -> Result<()> {
595 use tokio::io::AsyncWriteExt;
596
597 let file = tokio::fs::File::create(path).await?;
598 let mut writer = tokio::io::BufWriter::new(file);
599
600 for event in &self.events {
601 let line = serde_json::to_string(event)?;
602 writer.write_all(line.as_bytes()).await?;
603 writer.write_all(b"\n").await?;
604 }
605
606 writer.flush().await?;
607 Ok(())
608 }
609}
610
611#[cfg(test)]
612mod tests {
613 use super::*;
614 #[tokio::test]
615 async fn test_parse_tool_use_event() {
616 let event_json = r#"{"type":"tool_use","id":"123","name":"Read","input":{"file_path":"/test.txt"}}"#;
617
618 let parser = ClaudeStreamParser::new();
619 let event = parser.parse_event(event_json).unwrap();
620
621 match event {
622 ClaudeStreamEvent::ToolUse { name, .. } => {
623 assert_eq!(name, "Read");
624 }
625 _ => panic!("Expected ToolUse event"),
626 }
627 }
628
629 #[tokio::test]
630 async fn test_parse_thinking_event() {
631 let event_json = r#"{"type":"thinking","content":"Analyzing the code...","tokens":42}"#;
632
633 let parser = ClaudeStreamParser::new();
634 let event = parser.parse_event(event_json).unwrap();
635
636 match event {
637 ClaudeStreamEvent::Thinking { tokens, .. } => {
638 assert_eq!(tokens, 42);
639 }
640 _ => panic!("Expected Thinking event"),
641 }
642 }
643
644 #[tokio::test]
645 async fn test_stream_parsing() {
646 let stream_data = r#"{"type":"message_start","message":{"id":"msg_123","model":"claude-3","role":"assistant"}}
647{"type":"tool_use","id":"tool_1","name":"Read","input":{"file_path":"/test.txt"}}
648{"type":"thinking","content":"Processing...","tokens":25}
649{"type":"usage","input_tokens":100,"output_tokens":200,"total_tokens":300}
650"#;
651
652 let mut parser = ClaudeStreamParser::new();
653 let metrics = parser.parse_stream(stream_data.as_bytes()).await.unwrap();
654
655 assert_eq!(metrics.tool_invocations.len(), 1);
656 assert_eq!(metrics.thinking_metrics.total_sequences, 1);
657 assert_eq!(metrics.token_usage.total_tokens, 300);
658 }
659
660 #[tokio::test]
661 async fn test_error_handling() {
662 let stream_data = r#"{"type":"error","error_type":"ToolError","message":"File not found","recoverable":true}
663{"type":"error","error_type":"NetworkError","message":"Connection lost","recoverable":false}
664"#;
665
666 let mut parser = ClaudeStreamParser::new();
667 let metrics = parser.parse_stream(stream_data.as_bytes()).await.unwrap();
668
669 assert_eq!(metrics.error_metrics.total_errors, 2);
670 assert_eq!(metrics.error_metrics.recoverable_errors, 1);
671 }
672
673 #[tokio::test]
674 async fn test_export_training_data() {
675 let stream_data = r#"{"type":"tool_use","id":"1","name":"Write","input":{"content":"test"}}
676{"type":"thinking","content":"Done","tokens":10}
677"#;
678
679 let mut parser = ClaudeStreamParser::new();
680 parser.parse_stream(stream_data.as_bytes()).await.unwrap();
681
682 let export = parser.export_training_data();
683 assert_eq!(export.events.len(), 2);
684 assert_eq!(export.metadata.event_count, 2);
685
686 use tempfile::NamedTempFile;
688 let temp_file = NamedTempFile::new().unwrap();
689 export.to_json_file(temp_file.path().to_str().unwrap()).await.unwrap();
690
691 let content = tokio::fs::read_to_string(temp_file.path()).await.unwrap();
693 assert!(content.contains("tool_use"));
694 }
695
696 #[tokio::test]
697 async fn test_performance_metrics_calculation() {
698 let stream_data = r#"{"type":"message_start","message":{"id":"1","model":"claude","role":"assistant"}}
699{"type":"tool_use","id":"1","name":"Read","input":{}}
700{"type":"tool_use","id":"2","name":"Write","input":{}}
701{"type":"tool_use","id":"3","name":"Read","input":{}}
702{"type":"thinking","content":"Planning...","tokens":50}
703{"type":"thinking","content":"Executing...","tokens":75}
704{"type":"usage","input_tokens":150,"output_tokens":250,"total_tokens":400}
705"#;
706
707 let mut parser = ClaudeStreamParser::new();
708 let metrics = parser.parse_stream(stream_data.as_bytes()).await.unwrap();
709
710 assert_eq!(metrics.tool_invocations.get("Read").unwrap().invocation_count, 2);
712 assert_eq!(metrics.tool_invocations.get("Write").unwrap().invocation_count, 1);
713
714 assert_eq!(metrics.thinking_metrics.total_sequences, 2);
716 assert_eq!(metrics.thinking_metrics.total_tokens, 125);
717 assert_eq!(metrics.thinking_metrics.average_tokens_per_sequence, 62.5);
718
719 assert_eq!(metrics.token_usage.input_tokens, 150);
721 assert_eq!(metrics.token_usage.output_tokens, 250);
722 assert_eq!(metrics.token_usage.total_tokens, 400);
723 }
724}