1use std::cmp::Reverse;
2use std::fs;
3use std::io::{BufRead, BufReader, Read};
4use std::path::{Path, PathBuf};
5use std::process::{Command, Stdio};
6use std::time::SystemTime;
7
8use serde_json::Value;
9
10use crate::error::{Result, XurlError};
11use crate::model::{ProviderKind, ResolutionMeta, ResolvedThread, WriteRequest, WriteResult};
12use crate::provider::{
13 Provider, WriteEventSink, append_passthrough_args, append_passthrough_args_excluding,
14};
15
16#[derive(Debug, Clone)]
17pub struct CopilotProvider {
18 root: PathBuf,
19}
20
21impl CopilotProvider {
22 pub fn new(root: impl Into<PathBuf>) -> Self {
23 Self { root: root.into() }
24 }
25
26 fn session_state_root(&self) -> PathBuf {
27 self.root.join("session-state")
28 }
29
30 fn direct_event_path(root: &Path, session_id: &str) -> PathBuf {
31 root.join(session_id).join("events.jsonl")
32 }
33
34 fn legacy_event_path(root: &Path, session_id: &str) -> PathBuf {
35 root.join(format!("{session_id}.jsonl"))
36 }
37
38 fn candidate_paths(root: &Path, session_id: &str) -> Vec<PathBuf> {
39 [
40 Self::direct_event_path(root, session_id),
41 Self::legacy_event_path(root, session_id),
42 ]
43 .into_iter()
44 .filter(|path| path.exists())
45 .collect()
46 }
47
48 fn choose_latest(paths: Vec<PathBuf>) -> Option<(PathBuf, usize)> {
49 if paths.is_empty() {
50 return None;
51 }
52
53 let mut scored = paths
54 .into_iter()
55 .map(|path| {
56 let modified = fs::metadata(&path)
57 .and_then(|meta| meta.modified())
58 .unwrap_or(SystemTime::UNIX_EPOCH);
59 (path, modified)
60 })
61 .collect::<Vec<_>>();
62 scored.sort_by_key(|(_, modified)| Reverse(*modified));
63 let count = scored.len();
64 scored.into_iter().next().map(|(path, _)| (path, count))
65 }
66
67 fn copilot_bin() -> String {
68 std::env::var("XURL_COPILOT_BIN").unwrap_or_else(|_| "copilot".to_string())
69 }
70
71 fn spawn_copilot_command(args: &[String]) -> Result<std::process::Child> {
72 let bin = Self::copilot_bin();
73 let mut command = Command::new(&bin);
74 command
75 .args(args)
76 .stdin(Stdio::null())
77 .stdout(Stdio::piped())
78 .stderr(Stdio::piped());
79 command.spawn().map_err(|source| {
80 if source.kind() == std::io::ErrorKind::NotFound {
81 XurlError::CommandNotFound { command: bin }
82 } else {
83 XurlError::Io {
84 path: PathBuf::from(bin),
85 source,
86 }
87 }
88 })
89 }
90
91 fn session_id_from_event(value: &Value) -> Option<&str> {
92 value.get("sessionId").and_then(Value::as_str).or_else(|| {
93 value
94 .get("data")
95 .and_then(|data| data.get("sessionId"))
96 .and_then(Value::as_str)
97 })
98 }
99
100 fn assistant_delta(value: &Value) -> Option<&str> {
101 if value.get("type").and_then(Value::as_str) != Some("assistant.message_delta") {
102 return None;
103 }
104
105 value
106 .get("data")
107 .and_then(|data| data.get("deltaContent"))
108 .and_then(Value::as_str)
109 }
110
111 fn assistant_message_text(value: &Value) -> Option<&str> {
112 if value.get("type").and_then(Value::as_str) != Some("assistant.message") {
113 return None;
114 }
115
116 value
117 .get("data")
118 .and_then(|data| data.get("content"))
119 .and_then(Value::as_str)
120 }
121
122 fn run_write(
123 &self,
124 args: &[String],
125 req: &WriteRequest,
126 sink: &mut dyn WriteEventSink,
127 warnings: Vec<String>,
128 ) -> Result<WriteResult> {
129 let mut child = Self::spawn_copilot_command(args)?;
130 let stdout = child.stdout.take().ok_or_else(|| {
131 XurlError::WriteProtocol("copilot stdout pipe is unavailable".to_string())
132 })?;
133 let stderr = child.stderr.take().ok_or_else(|| {
134 XurlError::WriteProtocol("copilot stderr pipe is unavailable".to_string())
135 })?;
136 let stderr_handle = std::thread::spawn(move || {
137 let mut reader = BufReader::new(stderr);
138 let mut content = String::new();
139 let _ = reader.read_to_string(&mut content);
140 content
141 });
142
143 let stream_path = Path::new("<copilot:stdout>");
144 let mut session_id = req.session_id.clone();
145 let mut final_text = None::<String>;
146 let mut streamed_text = String::new();
147 let mut saw_json_event = false;
148 let reader = BufReader::new(stdout);
149 for line in reader.lines() {
150 let line = line.map_err(|source| XurlError::Io {
151 path: stream_path.to_path_buf(),
152 source,
153 })?;
154 let trimmed = line.trim();
155 if trimmed.is_empty() {
156 continue;
157 }
158
159 let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
160 continue;
161 };
162 saw_json_event = true;
163
164 if let Some(current_session_id) = Self::session_id_from_event(&value)
165 && session_id.as_deref() != Some(current_session_id)
166 {
167 sink.on_session_ready(ProviderKind::Copilot, current_session_id)?;
168 session_id = Some(current_session_id.to_string());
169 }
170
171 if let Some(delta) = Self::assistant_delta(&value)
172 && !delta.is_empty()
173 {
174 sink.on_text_delta(delta)?;
175 streamed_text.push_str(delta);
176 final_text = Some(streamed_text.clone());
177 }
178
179 if let Some(text) = Self::assistant_message_text(&value)
180 && !text.is_empty()
181 {
182 final_text = Some(text.to_string());
183 }
184 }
185
186 let status = child.wait().map_err(|source| XurlError::Io {
187 path: PathBuf::from(Self::copilot_bin()),
188 source,
189 })?;
190 let stderr_content = stderr_handle.join().unwrap_or_default();
191 if !status.success() {
192 return Err(XurlError::CommandFailed {
193 command: format!("{} {}", Self::copilot_bin(), args.join(" ")),
194 code: status.code(),
195 stderr: stderr_content.trim().to_string(),
196 });
197 }
198
199 if !saw_json_event {
200 return Err(XurlError::WriteProtocol(
201 "copilot output does not contain JSON events".to_string(),
202 ));
203 }
204
205 let session_id = if let Some(session_id) = session_id {
206 session_id
207 } else {
208 return Err(XurlError::WriteProtocol(
209 "missing session id in copilot event stream".to_string(),
210 ));
211 };
212
213 Ok(WriteResult {
214 provider: ProviderKind::Copilot,
215 session_id,
216 final_text,
217 warnings,
218 })
219 }
220}
221
222impl Provider for CopilotProvider {
223 fn kind(&self) -> ProviderKind {
224 ProviderKind::Copilot
225 }
226
227 fn resolve(&self, session_id: &str) -> Result<ResolvedThread> {
228 let sessions_root = self.session_state_root();
229 let candidates = Self::candidate_paths(&sessions_root, session_id);
230
231 if let Some((selected, count)) = Self::choose_latest(candidates) {
232 let mut metadata = ResolutionMeta {
233 source: "copilot:session-state".to_string(),
234 candidate_count: count,
235 warnings: Vec::new(),
236 };
237 if count > 1 {
238 metadata.warnings.push(format!(
239 "multiple matches found ({count}) for session_id={session_id}; selected latest: {}",
240 selected.display()
241 ));
242 }
243
244 return Ok(ResolvedThread {
245 provider: ProviderKind::Copilot,
246 session_id: session_id.to_string(),
247 path: selected,
248 metadata,
249 });
250 }
251
252 Err(XurlError::ThreadNotFound {
253 provider: ProviderKind::Copilot.to_string(),
254 session_id: session_id.to_string(),
255 searched_roots: vec![
256 Self::direct_event_path(&sessions_root, session_id),
257 Self::legacy_event_path(&sessions_root, session_id),
258 ],
259 })
260 }
261
262 fn write(&self, req: &WriteRequest, sink: &mut dyn WriteEventSink) -> Result<WriteResult> {
263 let mut warnings = Vec::new();
264 let mut args = vec![
265 "-p".to_string(),
266 req.prompt.clone(),
267 "--output-format".to_string(),
268 "json".to_string(),
269 "--allow-all-tools".to_string(),
270 ];
271
272 if let Some(role) = req.options.role.as_deref() {
273 args.push("--agent".to_string());
274 args.push(role.to_string());
275 let ignored =
276 append_passthrough_args_excluding(&mut args, &req.options.params, &["agent"]);
277 if !ignored.is_empty() {
278 warnings.push(
279 "ignored query parameter `agent` because URI role is already set".to_string(),
280 );
281 }
282 } else {
283 append_passthrough_args(&mut args, &req.options.params);
284 }
285
286 if let Some(session_id) = req.session_id.as_deref() {
287 args.push("--resume".to_string());
288 args.push(session_id.to_string());
289 }
290
291 self.run_write(&args, req, sink, warnings)
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use std::fs;
298
299 use tempfile::tempdir;
300
301 use crate::provider::Provider;
302 use crate::provider::copilot::CopilotProvider;
303
304 #[test]
305 fn resolves_from_session_state_events_jsonl() {
306 let temp = tempdir().expect("tempdir");
307 let thread_file = temp
308 .path()
309 .join("session-state/688628a1-407a-4b4e-b24a-1a250ebf864f/events.jsonl");
310 fs::create_dir_all(thread_file.parent().expect("parent")).expect("mkdir");
311 fs::write(&thread_file, "{}\n").expect("write");
312
313 let provider = CopilotProvider::new(temp.path());
314 let resolved = provider
315 .resolve("688628a1-407a-4b4e-b24a-1a250ebf864f")
316 .expect("resolve should succeed");
317
318 assert_eq!(resolved.path, thread_file);
319 assert_eq!(resolved.metadata.source, "copilot:session-state");
320 }
321
322 #[test]
323 fn resolves_from_legacy_jsonl_file() {
324 let temp = tempdir().expect("tempdir");
325 let thread_file = temp
326 .path()
327 .join("session-state/688628a1-407a-4b4e-b24a-1a250ebf864f.jsonl");
328 fs::create_dir_all(thread_file.parent().expect("parent")).expect("mkdir");
329 fs::write(&thread_file, "{}\n").expect("write");
330
331 let provider = CopilotProvider::new(temp.path());
332 let resolved = provider
333 .resolve("688628a1-407a-4b4e-b24a-1a250ebf864f")
334 .expect("resolve should succeed");
335
336 assert_eq!(resolved.path, thread_file);
337 }
338}