Skip to main content

tracevault_cli/commands/
stream.rs

1use std::fs::{self, OpenOptions};
2use std::io::{self, BufRead, Read, Seek, SeekFrom, Write};
3use std::path::Path;
4
5use tracevault_core::hooks::{parse_hook_event, HookResponse};
6use tracevault_core::streaming::{StreamEventRequest, StreamEventType};
7
8pub fn next_event_index(counter_path: &Path) -> Result<i32, io::Error> {
9    let current = if counter_path.exists() {
10        let content = fs::read_to_string(counter_path)?;
11        content
12            .trim()
13            .parse::<i32>()
14            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
15    } else {
16        0
17    };
18    fs::write(counter_path, (current + 1).to_string())?;
19    Ok(current)
20}
21
22pub fn read_new_transcript_lines(
23    transcript_path: &Path,
24    offset_path: &Path,
25) -> Result<(Vec<serde_json::Value>, i64), io::Error> {
26    if !transcript_path.exists() {
27        return Ok((vec![], 0));
28    }
29
30    let offset: i64 = if offset_path.exists() {
31        let content = fs::read_to_string(offset_path)?;
32        content
33            .trim()
34            .parse::<i64>()
35            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
36    } else {
37        0
38    };
39
40    let mut file = fs::File::open(transcript_path)?;
41    file.seek(SeekFrom::Start(offset as u64))?;
42
43    let reader = io::BufReader::new(file);
44    let mut lines = Vec::new();
45    let mut bytes_read = offset;
46
47    for line_result in reader.lines() {
48        let line = line_result?;
49        // +1 for the newline character
50        bytes_read += line.len() as i64 + 1;
51        if line.trim().is_empty() {
52            continue;
53        }
54        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
55            lines.push(value);
56        }
57    }
58
59    Ok((lines, bytes_read))
60}
61
62pub fn append_pending(pending_path: &Path, json: &str) -> Result<(), io::Error> {
63    let mut file = OpenOptions::new()
64        .create(true)
65        .append(true)
66        .open(pending_path)?;
67    writeln!(file, "{json}")?;
68    Ok(())
69}
70
71pub fn drain_pending(pending_path: &Path) -> Result<Vec<String>, io::Error> {
72    if !pending_path.exists() {
73        return Ok(vec![]);
74    }
75    let content = fs::read_to_string(pending_path)?;
76    let lines: Vec<String> = content
77        .lines()
78        .filter(|l| !l.trim().is_empty())
79        .map(String::from)
80        .collect();
81    fs::remove_file(pending_path)?;
82    Ok(lines)
83}
84
85pub async fn run_stream(
86    project_root: &Path,
87    event_type: &str,
88) -> Result<(), Box<dyn std::error::Error>> {
89    // 1. Read HookEvent from stdin
90    let mut input = String::new();
91    io::stdin().read_to_string(&mut input)?;
92    let hook_event = parse_hook_event(&input)?;
93
94    // 2. Create session dir
95    let session_dir = project_root
96        .join(".tracevault")
97        .join("sessions")
98        .join(&hook_event.session_id);
99    fs::create_dir_all(&session_dir)?;
100
101    // 3. Get event_index
102    let counter_path = session_dir.join(".event_counter");
103    let event_index = next_event_index(&counter_path)?;
104
105    // 4. Read new transcript lines
106    let transcript_path = Path::new(&hook_event.transcript_path);
107    let offset_path = session_dir.join(".stream_offset");
108    let (transcript_lines, new_offset) = read_new_transcript_lines(transcript_path, &offset_path)?;
109
110    // 5. Build StreamEventRequest
111    let stream_event_type = match event_type {
112        "notification" => StreamEventType::SessionStart,
113        "stop" => StreamEventType::SessionEnd,
114        _ => StreamEventType::ToolUse,
115    };
116
117    let req = StreamEventRequest {
118        protocol_version: 1,
119        tool: Some("claude-code".to_string()),
120        event_type: stream_event_type,
121        session_id: hook_event.session_id.clone(),
122        timestamp: chrono::Utc::now(),
123        hook_event_name: Some(hook_event.hook_event_name.clone()),
124        tool_name: hook_event.tool_name.clone(),
125        tool_input: hook_event.tool_input.clone(),
126        tool_response: hook_event.tool_response.clone(),
127        event_index: Some(event_index),
128        transcript_lines: if transcript_lines.is_empty() {
129            None
130        } else {
131            Some(transcript_lines)
132        },
133        transcript_offset: Some(new_offset),
134        model: None,
135        cwd: Some(hook_event.cwd.clone()),
136        final_stats: None,
137    };
138
139    // 6. Resolve credentials
140    let (server_url, token) = crate::api_client::resolve_credentials(project_root);
141
142    // 7. Load config for org_slug and repo_id
143    let config =
144        crate::config::TracevaultConfig::load(project_root).ok_or("TracevaultConfig not found")?;
145    let org_slug = config
146        .org_slug
147        .as_deref()
148        .ok_or("org_slug not configured")?;
149    let repo_id = config.repo_id.as_deref().ok_or("repo_id not configured")?;
150
151    // 8. Create ApiClient
152    let server_url = server_url.ok_or("server_url not configured")?;
153    let client = crate::api_client::ApiClient::new(&server_url, token.as_deref());
154
155    // 9. Try drain pending queue and send
156    let pending_path = session_dir.join("pending.jsonl");
157    let pending_events = drain_pending(&pending_path)?;
158
159    let mut send_failed = false;
160
161    // Send pending events first
162    for pending_json in &pending_events {
163        if let Ok(pending_req) = serde_json::from_str::<StreamEventRequest>(pending_json) {
164            if client
165                .stream_event(org_slug, repo_id, &pending_req)
166                .await
167                .is_err()
168            {
169                // Re-queue all remaining pending events
170                for evt in &pending_events {
171                    append_pending(&pending_path, evt)?;
172                }
173                send_failed = true;
174                break;
175            }
176        }
177    }
178
179    // Send current event
180    let req_json = serde_json::to_string(&req)?;
181    if send_failed {
182        append_pending(&pending_path, &req_json)?;
183    } else {
184        match client.stream_event(org_slug, repo_id, &req).await {
185            Ok(_) => {
186                // 10. On success update .stream_offset
187                fs::write(&offset_path, new_offset.to_string())?;
188            }
189            Err(_) => {
190                // 11. On failure append to pending.jsonl
191                append_pending(&pending_path, &req_json)?;
192            }
193        }
194    }
195
196    // 12. Always print HookResponse::allow() to stdout
197    let response = HookResponse::allow();
198    println!("{}", serde_json::to_string(&response)?);
199
200    Ok(())
201}