tracevault_cli/commands/
stream.rs1use std::fs::{self, OpenOptions};
2use std::io::{self, BufRead, Read, Seek, SeekFrom, Write};
3use std::path::Path;
4
5use tracevault_core::hooks::{parse_hook_event, HookResponse};
6use tracevault_core::streaming::{StreamEventRequest, StreamEventType};
7
8pub fn next_event_index(counter_path: &Path) -> Result<i32, io::Error> {
9 let current = if counter_path.exists() {
10 let content = fs::read_to_string(counter_path)?;
11 content
12 .trim()
13 .parse::<i32>()
14 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
15 } else {
16 0
17 };
18 fs::write(counter_path, (current + 1).to_string())?;
19 Ok(current)
20}
21
22pub fn read_new_transcript_lines(
23 transcript_path: &Path,
24 offset_path: &Path,
25) -> Result<(Vec<serde_json::Value>, i64), io::Error> {
26 if !transcript_path.exists() {
27 return Ok((vec![], 0));
28 }
29
30 let offset: i64 = if offset_path.exists() {
31 let content = fs::read_to_string(offset_path)?;
32 content
33 .trim()
34 .parse::<i64>()
35 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
36 } else {
37 0
38 };
39
40 let mut file = fs::File::open(transcript_path)?;
41 file.seek(SeekFrom::Start(offset as u64))?;
42
43 let reader = io::BufReader::new(file);
44 let mut lines = Vec::new();
45 let mut bytes_read = offset;
46
47 for line_result in reader.lines() {
48 let line = line_result?;
49 bytes_read += line.len() as i64 + 1;
51 if line.trim().is_empty() {
52 continue;
53 }
54 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
55 lines.push(value);
56 }
57 }
58
59 Ok((lines, bytes_read))
60}
61
62pub fn append_pending(pending_path: &Path, json: &str) -> Result<(), io::Error> {
63 let mut file = OpenOptions::new()
64 .create(true)
65 .append(true)
66 .open(pending_path)?;
67 writeln!(file, "{json}")?;
68 Ok(())
69}
70
71pub fn drain_pending(pending_path: &Path) -> Result<Vec<String>, io::Error> {
72 if !pending_path.exists() {
73 return Ok(vec![]);
74 }
75 let content = fs::read_to_string(pending_path)?;
76 let lines: Vec<String> = content
77 .lines()
78 .filter(|l| !l.trim().is_empty())
79 .map(String::from)
80 .collect();
81 fs::remove_file(pending_path)?;
82 Ok(lines)
83}
84
85pub async fn run_stream(
86 project_root: &Path,
87 event_type: &str,
88) -> Result<(), Box<dyn std::error::Error>> {
89 let mut input = String::new();
91 io::stdin().read_to_string(&mut input)?;
92 let hook_event = parse_hook_event(&input)?;
93
94 let session_dir = project_root
96 .join(".tracevault")
97 .join("sessions")
98 .join(&hook_event.session_id);
99 fs::create_dir_all(&session_dir)?;
100
101 let counter_path = session_dir.join(".event_counter");
103 let event_index = next_event_index(&counter_path)?;
104
105 let transcript_path = Path::new(&hook_event.transcript_path);
107 let offset_path = session_dir.join(".stream_offset");
108 let (transcript_lines, new_offset) = read_new_transcript_lines(transcript_path, &offset_path)?;
109
110 let stream_event_type = match event_type {
112 "notification" => StreamEventType::SessionStart,
113 "stop" => StreamEventType::SessionEnd,
114 _ => StreamEventType::ToolUse,
115 };
116
117 let req = StreamEventRequest {
118 protocol_version: 1,
119 tool: Some("claude-code".to_string()),
120 event_type: stream_event_type,
121 session_id: hook_event.session_id.clone(),
122 timestamp: chrono::Utc::now(),
123 hook_event_name: Some(hook_event.hook_event_name.clone()),
124 tool_name: hook_event.tool_name.clone(),
125 tool_input: hook_event.tool_input.clone(),
126 tool_response: hook_event.tool_response.clone(),
127 event_index: Some(event_index),
128 transcript_lines: if transcript_lines.is_empty() {
129 None
130 } else {
131 Some(transcript_lines)
132 },
133 transcript_offset: Some(new_offset),
134 model: None,
135 cwd: Some(hook_event.cwd.clone()),
136 final_stats: None,
137 };
138
139 let (server_url, token) = crate::api_client::resolve_credentials(project_root);
141
142 let config =
144 crate::config::TracevaultConfig::load(project_root).ok_or("TracevaultConfig not found")?;
145 let org_slug = config
146 .org_slug
147 .as_deref()
148 .ok_or("org_slug not configured")?;
149 let repo_id = config.repo_id.as_deref().ok_or("repo_id not configured")?;
150
151 let server_url = server_url.ok_or("server_url not configured")?;
153 let client = crate::api_client::ApiClient::new(&server_url, token.as_deref());
154
155 let pending_path = session_dir.join("pending.jsonl");
157 let pending_events = drain_pending(&pending_path)?;
158
159 let mut send_failed = false;
160
161 for pending_json in &pending_events {
163 if let Ok(pending_req) = serde_json::from_str::<StreamEventRequest>(pending_json) {
164 if client
165 .stream_event(org_slug, repo_id, &pending_req)
166 .await
167 .is_err()
168 {
169 for evt in &pending_events {
171 append_pending(&pending_path, evt)?;
172 }
173 send_failed = true;
174 break;
175 }
176 }
177 }
178
179 let req_json = serde_json::to_string(&req)?;
181 if send_failed {
182 append_pending(&pending_path, &req_json)?;
183 } else {
184 match client.stream_event(org_slug, repo_id, &req).await {
185 Ok(_) => {
186 fs::write(&offset_path, new_offset.to_string())?;
188 }
189 Err(_) => {
190 append_pending(&pending_path, &req_json)?;
192 }
193 }
194 }
195
196 let response = HookResponse::allow();
198 println!("{}", serde_json::to_string(&response)?);
199
200 Ok(())
201}