Skip to main content

xurl_core/provider/
copilot.rs

1use std::cmp::Reverse;
2use std::fs;
3use std::io::{BufRead, BufReader, Read};
4use std::path::{Path, PathBuf};
5use std::process::{Command, Stdio};
6use std::time::SystemTime;
7
8use serde_json::Value;
9
10use crate::error::{Result, XurlError};
11use crate::model::{ProviderKind, ResolutionMeta, ResolvedThread, WriteRequest, WriteResult};
12use crate::provider::{
13    Provider, WriteEventSink, append_passthrough_args, append_passthrough_args_excluding,
14};
15
16#[derive(Debug, Clone)]
17pub struct CopilotProvider {
18    root: PathBuf,
19}
20
21impl CopilotProvider {
22    pub fn new(root: impl Into<PathBuf>) -> Self {
23        Self { root: root.into() }
24    }
25
26    fn session_state_root(&self) -> PathBuf {
27        self.root.join("session-state")
28    }
29
30    fn direct_event_path(root: &Path, session_id: &str) -> PathBuf {
31        root.join(session_id).join("events.jsonl")
32    }
33
34    fn legacy_event_path(root: &Path, session_id: &str) -> PathBuf {
35        root.join(format!("{session_id}.jsonl"))
36    }
37
38    fn candidate_paths(root: &Path, session_id: &str) -> Vec<PathBuf> {
39        [
40            Self::direct_event_path(root, session_id),
41            Self::legacy_event_path(root, session_id),
42        ]
43        .into_iter()
44        .filter(|path| path.exists())
45        .collect()
46    }
47
48    fn choose_latest(paths: Vec<PathBuf>) -> Option<(PathBuf, usize)> {
49        if paths.is_empty() {
50            return None;
51        }
52
53        let mut scored = paths
54            .into_iter()
55            .map(|path| {
56                let modified = fs::metadata(&path)
57                    .and_then(|meta| meta.modified())
58                    .unwrap_or(SystemTime::UNIX_EPOCH);
59                (path, modified)
60            })
61            .collect::<Vec<_>>();
62        scored.sort_by_key(|(_, modified)| Reverse(*modified));
63        let count = scored.len();
64        scored.into_iter().next().map(|(path, _)| (path, count))
65    }
66
67    fn copilot_bin() -> String {
68        std::env::var("XURL_COPILOT_BIN").unwrap_or_else(|_| "copilot".to_string())
69    }
70
71    fn spawn_copilot_command(args: &[String]) -> Result<std::process::Child> {
72        let bin = Self::copilot_bin();
73        let mut command = Command::new(&bin);
74        command
75            .args(args)
76            .stdin(Stdio::null())
77            .stdout(Stdio::piped())
78            .stderr(Stdio::piped());
79        command.spawn().map_err(|source| {
80            if source.kind() == std::io::ErrorKind::NotFound {
81                XurlError::CommandNotFound { command: bin }
82            } else {
83                XurlError::Io {
84                    path: PathBuf::from(bin),
85                    source,
86                }
87            }
88        })
89    }
90
91    fn session_id_from_event(value: &Value) -> Option<&str> {
92        value.get("sessionId").and_then(Value::as_str).or_else(|| {
93            value
94                .get("data")
95                .and_then(|data| data.get("sessionId"))
96                .and_then(Value::as_str)
97        })
98    }
99
100    fn assistant_delta(value: &Value) -> Option<&str> {
101        if value.get("type").and_then(Value::as_str) != Some("assistant.message_delta") {
102            return None;
103        }
104
105        value
106            .get("data")
107            .and_then(|data| data.get("deltaContent"))
108            .and_then(Value::as_str)
109    }
110
111    fn assistant_message_text(value: &Value) -> Option<&str> {
112        if value.get("type").and_then(Value::as_str) != Some("assistant.message") {
113            return None;
114        }
115
116        value
117            .get("data")
118            .and_then(|data| data.get("content"))
119            .and_then(Value::as_str)
120    }
121
122    fn run_write(
123        &self,
124        args: &[String],
125        req: &WriteRequest,
126        sink: &mut dyn WriteEventSink,
127        warnings: Vec<String>,
128    ) -> Result<WriteResult> {
129        let mut child = Self::spawn_copilot_command(args)?;
130        let stdout = child.stdout.take().ok_or_else(|| {
131            XurlError::WriteProtocol("copilot stdout pipe is unavailable".to_string())
132        })?;
133        let stderr = child.stderr.take().ok_or_else(|| {
134            XurlError::WriteProtocol("copilot stderr pipe is unavailable".to_string())
135        })?;
136        let stderr_handle = std::thread::spawn(move || {
137            let mut reader = BufReader::new(stderr);
138            let mut content = String::new();
139            let _ = reader.read_to_string(&mut content);
140            content
141        });
142
143        let stream_path = Path::new("<copilot:stdout>");
144        let mut session_id = req.session_id.clone();
145        let mut final_text = None::<String>;
146        let mut streamed_text = String::new();
147        let mut saw_json_event = false;
148        let reader = BufReader::new(stdout);
149        for line in reader.lines() {
150            let line = line.map_err(|source| XurlError::Io {
151                path: stream_path.to_path_buf(),
152                source,
153            })?;
154            let trimmed = line.trim();
155            if trimmed.is_empty() {
156                continue;
157            }
158
159            let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
160                continue;
161            };
162            saw_json_event = true;
163
164            if let Some(current_session_id) = Self::session_id_from_event(&value)
165                && session_id.as_deref() != Some(current_session_id)
166            {
167                sink.on_session_ready(ProviderKind::Copilot, current_session_id)?;
168                session_id = Some(current_session_id.to_string());
169            }
170
171            if let Some(delta) = Self::assistant_delta(&value)
172                && !delta.is_empty()
173            {
174                sink.on_text_delta(delta)?;
175                streamed_text.push_str(delta);
176                final_text = Some(streamed_text.clone());
177            }
178
179            if let Some(text) = Self::assistant_message_text(&value)
180                && !text.is_empty()
181            {
182                final_text = Some(text.to_string());
183            }
184        }
185
186        let status = child.wait().map_err(|source| XurlError::Io {
187            path: PathBuf::from(Self::copilot_bin()),
188            source,
189        })?;
190        let stderr_content = stderr_handle.join().unwrap_or_default();
191        if !status.success() {
192            return Err(XurlError::CommandFailed {
193                command: format!("{} {}", Self::copilot_bin(), args.join(" ")),
194                code: status.code(),
195                stderr: stderr_content.trim().to_string(),
196            });
197        }
198
199        if !saw_json_event {
200            return Err(XurlError::WriteProtocol(
201                "copilot output does not contain JSON events".to_string(),
202            ));
203        }
204
205        let session_id = if let Some(session_id) = session_id {
206            session_id
207        } else {
208            return Err(XurlError::WriteProtocol(
209                "missing session id in copilot event stream".to_string(),
210            ));
211        };
212
213        Ok(WriteResult {
214            provider: ProviderKind::Copilot,
215            session_id,
216            final_text,
217            warnings,
218        })
219    }
220}
221
222impl Provider for CopilotProvider {
223    fn kind(&self) -> ProviderKind {
224        ProviderKind::Copilot
225    }
226
227    fn resolve(&self, session_id: &str) -> Result<ResolvedThread> {
228        let sessions_root = self.session_state_root();
229        let candidates = Self::candidate_paths(&sessions_root, session_id);
230
231        if let Some((selected, count)) = Self::choose_latest(candidates) {
232            let mut metadata = ResolutionMeta {
233                source: "copilot:session-state".to_string(),
234                candidate_count: count,
235                warnings: Vec::new(),
236            };
237            if count > 1 {
238                metadata.warnings.push(format!(
239                    "multiple matches found ({count}) for session_id={session_id}; selected latest: {}",
240                    selected.display()
241                ));
242            }
243
244            return Ok(ResolvedThread {
245                provider: ProviderKind::Copilot,
246                session_id: session_id.to_string(),
247                path: selected,
248                metadata,
249            });
250        }
251
252        Err(XurlError::ThreadNotFound {
253            provider: ProviderKind::Copilot.to_string(),
254            session_id: session_id.to_string(),
255            searched_roots: vec![
256                Self::direct_event_path(&sessions_root, session_id),
257                Self::legacy_event_path(&sessions_root, session_id),
258            ],
259        })
260    }
261
262    fn write(&self, req: &WriteRequest, sink: &mut dyn WriteEventSink) -> Result<WriteResult> {
263        let mut warnings = Vec::new();
264        let mut args = vec![
265            "-p".to_string(),
266            req.prompt.clone(),
267            "--output-format".to_string(),
268            "json".to_string(),
269            "--allow-all-tools".to_string(),
270        ];
271
272        if let Some(role) = req.options.role.as_deref() {
273            args.push("--agent".to_string());
274            args.push(role.to_string());
275            let ignored =
276                append_passthrough_args_excluding(&mut args, &req.options.params, &["agent"]);
277            if !ignored.is_empty() {
278                warnings.push(
279                    "ignored query parameter `agent` because URI role is already set".to_string(),
280                );
281            }
282        } else {
283            append_passthrough_args(&mut args, &req.options.params);
284        }
285
286        if let Some(session_id) = req.session_id.as_deref() {
287            args.push("--resume".to_string());
288            args.push(session_id.to_string());
289        }
290
291        self.run_write(&args, req, sink, warnings)
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use std::fs;
298
299    use tempfile::tempdir;
300
301    use crate::provider::Provider;
302    use crate::provider::copilot::CopilotProvider;
303
304    #[test]
305    fn resolves_from_session_state_events_jsonl() {
306        let temp = tempdir().expect("tempdir");
307        let thread_file = temp
308            .path()
309            .join("session-state/688628a1-407a-4b4e-b24a-1a250ebf864f/events.jsonl");
310        fs::create_dir_all(thread_file.parent().expect("parent")).expect("mkdir");
311        fs::write(&thread_file, "{}\n").expect("write");
312
313        let provider = CopilotProvider::new(temp.path());
314        let resolved = provider
315            .resolve("688628a1-407a-4b4e-b24a-1a250ebf864f")
316            .expect("resolve should succeed");
317
318        assert_eq!(resolved.path, thread_file);
319        assert_eq!(resolved.metadata.source, "copilot:session-state");
320    }
321
322    #[test]
323    fn resolves_from_legacy_jsonl_file() {
324        let temp = tempdir().expect("tempdir");
325        let thread_file = temp
326            .path()
327            .join("session-state/688628a1-407a-4b4e-b24a-1a250ebf864f.jsonl");
328        fs::create_dir_all(thread_file.parent().expect("parent")).expect("mkdir");
329        fs::write(&thread_file, "{}\n").expect("write");
330
331        let provider = CopilotProvider::new(temp.path());
332        let resolved = provider
333            .resolve("688628a1-407a-4b4e-b24a-1a250ebf864f")
334            .expect("resolve should succeed");
335
336        assert_eq!(resolved.path, thread_file);
337    }
338}