Skip to main content

tracevault_cli/commands/
stream.rs

1use std::fs::{self, OpenOptions};
2use std::io::{self, BufRead, Read, Seek, SeekFrom, Write};
3use std::path::Path;
4
5use tracevault_core::hooks::{parse_hook_event, HookResponse};
6use tracevault_core::streaming::{StreamEventRequest, StreamEventType};
7
8pub fn next_event_index(counter_path: &Path) -> Result<i32, io::Error> {
9    let current = if counter_path.exists() {
10        let content = fs::read_to_string(counter_path)?;
11        content
12            .trim()
13            .parse::<i32>()
14            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
15    } else {
16        0
17    };
18    fs::write(counter_path, (current + 1).to_string())?;
19    Ok(current)
20}
21
22pub fn read_new_transcript_lines(
23    transcript_path: &Path,
24    offset_path: &Path,
25) -> Result<(Vec<serde_json::Value>, i64), io::Error> {
26    if !transcript_path.exists() {
27        return Ok((vec![], 0));
28    }
29
30    let offset: i64 = if offset_path.exists() {
31        let content = fs::read_to_string(offset_path)?;
32        content
33            .trim()
34            .parse::<i64>()
35            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
36    } else {
37        0
38    };
39
40    let mut file = fs::File::open(transcript_path)?;
41    file.seek(SeekFrom::Start(offset as u64))?;
42
43    let reader = io::BufReader::new(file);
44    let mut lines = Vec::new();
45    let mut bytes_read = offset;
46
47    for line_result in reader.lines() {
48        let line = line_result?;
49        // +1 for the newline character
50        bytes_read += line.len() as i64 + 1;
51        if line.trim().is_empty() {
52            continue;
53        }
54        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
55            lines.push(value);
56        }
57    }
58
59    Ok((lines, bytes_read))
60}
61
62pub fn append_pending(pending_path: &Path, json: &str) -> Result<(), io::Error> {
63    let mut file = OpenOptions::new()
64        .create(true)
65        .append(true)
66        .open(pending_path)?;
67    writeln!(file, "{json}")?;
68    Ok(())
69}
70
71pub fn drain_pending(pending_path: &Path) -> Result<Vec<String>, io::Error> {
72    if !pending_path.exists() {
73        return Ok(vec![]);
74    }
75    let content = fs::read_to_string(pending_path)?;
76    let lines: Vec<String> = content
77        .lines()
78        .filter(|l| !l.trim().is_empty())
79        .map(String::from)
80        .collect();
81    fs::remove_file(pending_path)?;
82    Ok(lines)
83}
84
85pub async fn run_stream(
86    project_root: &Path,
87    event_type: &str,
88) -> Result<(), Box<dyn std::error::Error>> {
89    // 1. Read HookEvent from stdin
90    let mut input = String::new();
91    io::stdin().read_to_string(&mut input)?;
92    let hook_event = parse_hook_event(&input)?;
93
94    // 2. Create session dir
95    let session_dir = project_root
96        .join(".tracevault")
97        .join("sessions")
98        .join(&hook_event.session_id);
99    fs::create_dir_all(&session_dir)?;
100
101    // 3. Get event_index
102    let counter_path = session_dir.join(".event_counter");
103    let event_index = next_event_index(&counter_path)?;
104
105    // 4. Read new transcript lines
106    let transcript_path = Path::new(&hook_event.transcript_path);
107    let offset_path = session_dir.join(".stream_offset");
108    let (transcript_lines, new_offset) = read_new_transcript_lines(transcript_path, &offset_path)?;
109
110    // 5. Build StreamEventRequest
111    let stream_event_type = match event_type {
112        "notification" => StreamEventType::SessionStart,
113        "stop" => StreamEventType::SessionEnd,
114        _ => StreamEventType::ToolUse,
115    };
116
117    let req = StreamEventRequest {
118        protocol_version: 1,
119        event_type: stream_event_type,
120        session_id: hook_event.session_id.clone(),
121        timestamp: chrono::Utc::now(),
122        hook_event_name: Some(hook_event.hook_event_name.clone()),
123        tool_name: hook_event.tool_name.clone(),
124        tool_input: hook_event.tool_input.clone(),
125        tool_response: hook_event.tool_response.clone(),
126        event_index: Some(event_index),
127        transcript_lines: if transcript_lines.is_empty() {
128            None
129        } else {
130            Some(transcript_lines)
131        },
132        transcript_offset: Some(new_offset),
133        model: None,
134        cwd: Some(hook_event.cwd.clone()),
135        final_stats: None,
136    };
137
138    // 6. Resolve credentials
139    let (server_url, token) = crate::api_client::resolve_credentials(project_root);
140
141    // 7. Load config for org_slug and repo_id
142    let config =
143        crate::config::TracevaultConfig::load(project_root).ok_or("TracevaultConfig not found")?;
144    let org_slug = config
145        .org_slug
146        .as_deref()
147        .ok_or("org_slug not configured")?;
148    let repo_id = config.repo_id.as_deref().ok_or("repo_id not configured")?;
149
150    // 8. Create ApiClient
151    let server_url = server_url.ok_or("server_url not configured")?;
152    let client = crate::api_client::ApiClient::new(&server_url, token.as_deref());
153
154    // 9. Try drain pending queue and send
155    let pending_path = session_dir.join("pending.jsonl");
156    let pending_events = drain_pending(&pending_path)?;
157
158    let mut send_failed = false;
159
160    // Send pending events first
161    for pending_json in &pending_events {
162        if let Ok(pending_req) = serde_json::from_str::<StreamEventRequest>(pending_json) {
163            if client
164                .stream_event(org_slug, repo_id, &pending_req)
165                .await
166                .is_err()
167            {
168                // Re-queue all remaining pending events
169                for evt in &pending_events {
170                    append_pending(&pending_path, evt)?;
171                }
172                send_failed = true;
173                break;
174            }
175        }
176    }
177
178    // Send current event
179    let req_json = serde_json::to_string(&req)?;
180    if send_failed {
181        append_pending(&pending_path, &req_json)?;
182    } else {
183        match client.stream_event(org_slug, repo_id, &req).await {
184            Ok(_) => {
185                // 10. On success update .stream_offset
186                fs::write(&offset_path, new_offset.to_string())?;
187            }
188            Err(_) => {
189                // 11. On failure append to pending.jsonl
190                append_pending(&pending_path, &req_json)?;
191            }
192        }
193    }
194
195    // 12. Always print HookResponse::allow() to stdout
196    let response = HookResponse::allow();
197    println!("{}", serde_json::to_string(&response)?);
198
199    Ok(())
200}