1use std::cmp::Reverse;
2use std::fs;
3use std::io::{BufRead, BufReader, Read};
4use std::path::{Path, PathBuf};
5use std::process::{Command, Stdio};
6use std::time::SystemTime;
7
8use serde_json::Value;
9use walkdir::WalkDir;
10
11use crate::error::{Result, XurlError};
12use crate::jsonl;
13use crate::model::{ProviderKind, ResolutionMeta, ResolvedThread, WriteRequest, WriteResult};
14use crate::provider::{Provider, WriteEventSink, append_passthrough_args};
15
16#[derive(Debug, Clone)]
17pub struct PiProvider {
18 root: PathBuf,
19}
20
21impl PiProvider {
22 pub fn new(root: impl Into<PathBuf>) -> Self {
23 Self { root: root.into() }
24 }
25
26 fn sessions_root(&self) -> PathBuf {
27 self.root.join("sessions")
28 }
29
30 fn has_session_id(path: &Path, session_id: &str) -> bool {
31 let file = match fs::File::open(path) {
32 Ok(file) => file,
33 Err(_) => return false,
34 };
35 let reader = BufReader::new(file);
36
37 let Some(first_non_empty) = reader
38 .lines()
39 .take(20)
40 .filter_map(std::result::Result::ok)
41 .find(|line| !line.trim().is_empty())
42 else {
43 return false;
44 };
45
46 let Ok(header) = serde_json::from_str::<Value>(&first_non_empty) else {
47 return false;
48 };
49
50 header.get("type").and_then(Value::as_str) == Some("session")
51 && header
52 .get("id")
53 .and_then(Value::as_str)
54 .is_some_and(|id| id.eq_ignore_ascii_case(session_id))
55 }
56
57 fn find_candidates(sessions_root: &Path, session_id: &str) -> Vec<PathBuf> {
58 if !sessions_root.exists() {
59 return Vec::new();
60 }
61
62 WalkDir::new(sessions_root)
63 .into_iter()
64 .filter_map(std::result::Result::ok)
65 .filter(|entry| entry.file_type().is_file())
66 .map(|entry| entry.into_path())
67 .filter(|path| {
68 path.extension()
69 .and_then(|ext| ext.to_str())
70 .is_some_and(|ext| ext == "jsonl")
71 })
72 .filter(|path| Self::has_session_id(path, session_id))
73 .collect()
74 }
75
76 fn choose_latest(paths: Vec<PathBuf>) -> Option<(PathBuf, usize)> {
77 if paths.is_empty() {
78 return None;
79 }
80
81 let mut scored = paths
82 .into_iter()
83 .map(|path| {
84 let modified = fs::metadata(&path)
85 .and_then(|meta| meta.modified())
86 .unwrap_or(SystemTime::UNIX_EPOCH);
87 (path, modified)
88 })
89 .collect::<Vec<_>>();
90 scored.sort_by_key(|(_, modified)| Reverse(*modified));
91 let count = scored.len();
92 scored.into_iter().next().map(|(path, _)| (path, count))
93 }
94
95 fn pi_bin() -> String {
96 std::env::var("XURL_PI_BIN").unwrap_or_else(|_| "pi".to_string())
97 }
98
99 fn spawn_pi_command(args: &[String]) -> Result<std::process::Child> {
100 let bin = Self::pi_bin();
101 let mut command = Command::new(&bin);
102 command
103 .args(args)
104 .stdin(Stdio::null())
105 .stdout(Stdio::piped())
106 .stderr(Stdio::piped());
107 command.spawn().map_err(|source| {
108 if source.kind() == std::io::ErrorKind::NotFound {
109 XurlError::CommandNotFound { command: bin }
110 } else {
111 XurlError::Io {
112 path: PathBuf::from(bin),
113 source,
114 }
115 }
116 })
117 }
118
119 fn extract_assistant_text(message: &Value) -> Option<String> {
120 if message.get("role").and_then(Value::as_str) != Some("assistant") {
121 return None;
122 }
123
124 if let Some(content) = message.get("content").and_then(Value::as_str) {
125 if content.is_empty() {
126 return None;
127 }
128 return Some(content.to_string());
129 }
130
131 let content = message.get("content")?.as_array()?;
132 let text = content
133 .iter()
134 .filter_map(|item| {
135 if item.get("type").and_then(Value::as_str) == Some("text") {
136 item.get("text").and_then(Value::as_str)
137 } else {
138 None
139 }
140 })
141 .collect::<Vec<_>>()
142 .join("");
143 if text.is_empty() { None } else { Some(text) }
144 }
145
146 fn run_write(
147 &self,
148 args: &[String],
149 req: &WriteRequest,
150 sink: &mut dyn WriteEventSink,
151 warnings: Vec<String>,
152 ) -> Result<WriteResult> {
153 let mut child = Self::spawn_pi_command(args)?;
154 let stdout = child
155 .stdout
156 .take()
157 .ok_or_else(|| XurlError::WriteProtocol("pi stdout pipe is unavailable".to_string()))?;
158 let stderr = child
159 .stderr
160 .take()
161 .ok_or_else(|| XurlError::WriteProtocol("pi stderr pipe is unavailable".to_string()))?;
162 let stderr_handle = std::thread::spawn(move || {
163 let mut reader = BufReader::new(stderr);
164 let mut content = String::new();
165 let _ = reader.read_to_string(&mut content);
166 content
167 });
168
169 let mut session_id = req.session_id.clone();
170 let mut final_text = None::<String>;
171 let mut streamed_text = String::new();
172 let mut streamed_delta = false;
173 let stream_path = Path::new("<pi:stdout>");
174 let reader = BufReader::new(stdout);
175 jsonl::parse_jsonl_reader(stream_path, reader, |_, value| {
176 let Some(event_type) = value.get("type").and_then(Value::as_str) else {
177 return Ok(());
178 };
179
180 match event_type {
181 "session" => {
182 if let Some(current_session_id) = value.get("id").and_then(Value::as_str)
183 && session_id.as_deref() != Some(current_session_id)
184 {
185 sink.on_session_ready(ProviderKind::Pi, current_session_id)?;
186 session_id = Some(current_session_id.to_string());
187 }
188 }
189 "message_update" => {
190 if value
191 .get("assistantMessageEvent")
192 .and_then(Value::as_object)
193 .and_then(|event| event.get("type"))
194 .and_then(Value::as_str)
195 == Some("text_delta")
196 && let Some(delta) = value
197 .get("assistantMessageEvent")
198 .and_then(Value::as_object)
199 .and_then(|event| event.get("delta"))
200 .and_then(Value::as_str)
201 && !delta.is_empty()
202 {
203 sink.on_text_delta(delta)?;
204 streamed_text.push_str(delta);
205 final_text = Some(streamed_text.clone());
206 streamed_delta = true;
207 }
208 }
209 "message_end" | "turn_end" => {
210 if streamed_delta {
211 return Ok(());
212 }
213 if let Some(text) = value
214 .get("message")
215 .and_then(Self::extract_assistant_text)
216 .filter(|text| !text.is_empty())
217 {
218 sink.on_text_delta(&text)?;
219 final_text = Some(text);
220 }
221 }
222 _ => {}
223 }
224 Ok(())
225 })?;
226
227 let status = child.wait().map_err(|source| XurlError::Io {
228 path: PathBuf::from(Self::pi_bin()),
229 source,
230 })?;
231 let stderr_content = stderr_handle.join().unwrap_or_default();
232 if !status.success() {
233 return Err(XurlError::CommandFailed {
234 command: format!("{} {}", Self::pi_bin(), args.join(" ")),
235 code: status.code(),
236 stderr: stderr_content.trim().to_string(),
237 });
238 }
239
240 let session_id = if let Some(session_id) = session_id {
241 session_id
242 } else {
243 return Err(XurlError::WriteProtocol(
244 "missing session id in pi event stream".to_string(),
245 ));
246 };
247
248 Ok(WriteResult {
249 provider: ProviderKind::Pi,
250 session_id,
251 final_text,
252 warnings,
253 })
254 }
255}
256
257impl Provider for PiProvider {
258 fn kind(&self) -> ProviderKind {
259 ProviderKind::Pi
260 }
261
262 fn resolve(&self, session_id: &str) -> Result<ResolvedThread> {
263 let sessions_root = self.sessions_root();
264 let candidates = Self::find_candidates(&sessions_root, session_id);
265
266 if let Some((selected, count)) = Self::choose_latest(candidates) {
267 let mut metadata = ResolutionMeta {
268 source: "pi:sessions".to_string(),
269 candidate_count: count,
270 warnings: Vec::new(),
271 };
272
273 if count > 1 {
274 metadata.warnings.push(format!(
275 "multiple matches found ({count}) for session_id={session_id}; selected latest: {}",
276 selected.display()
277 ));
278 }
279
280 return Ok(ResolvedThread {
281 provider: ProviderKind::Pi,
282 session_id: session_id.to_string(),
283 path: selected,
284 metadata,
285 });
286 }
287
288 Err(XurlError::ThreadNotFound {
289 provider: ProviderKind::Pi.to_string(),
290 session_id: session_id.to_string(),
291 searched_roots: vec![sessions_root],
292 })
293 }
294
295 fn write(&self, req: &WriteRequest, sink: &mut dyn WriteEventSink) -> Result<WriteResult> {
296 if let Some(role) = req.options.role.as_deref() {
297 return Err(XurlError::InvalidMode(format!(
298 "provider `{}` does not support role-based write URI (`{role}`)",
299 ProviderKind::Pi
300 )));
301 }
302 let warnings = Vec::new();
303 let mut args = Vec::new();
304 if let Some(session_id) = req.session_id.as_deref() {
305 let resolved = self.resolve(session_id)?;
306 let session_path = resolved.path.to_string_lossy().to_string();
307 args.push("--session".to_string());
308 args.push(session_path);
309 args.push("-p".to_string());
310 args.push(req.prompt.clone());
311 args.push("--mode".to_string());
312 args.push("json".to_string());
313 } else {
314 args.push("-p".to_string());
315 args.push(req.prompt.clone());
316 args.push("--mode".to_string());
317 args.push("json".to_string());
318 }
319 append_passthrough_args(&mut args, &req.options.params);
320 self.run_write(&args, req, sink, warnings)
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::fs;
327 use std::path::{Path, PathBuf};
328 use std::thread;
329 use std::time::Duration;
330
331 use tempfile::tempdir;
332
333 use crate::provider::Provider;
334 use crate::provider::pi::PiProvider;
335
336 fn write_session(root: &Path, session_dir: &str, file_name: &str, session_id: &str) -> PathBuf {
337 let path = root.join("sessions").join(session_dir).join(file_name);
338 fs::create_dir_all(path.parent().expect("parent")).expect("mkdir");
339 fs::write(
340 &path,
341 format!(
342 "{{\"type\":\"session\",\"version\":3,\"id\":\"{session_id}\",\"timestamp\":\"2026-02-23T13:00:12.780Z\",\"cwd\":\"/tmp/project\"}}\n{{\"type\":\"message\",\"id\":\"a1b2c3d4\",\"parentId\":null,\"timestamp\":\"2026-02-23T13:00:13.000Z\",\"message\":{{\"role\":\"user\",\"content\":[{{\"type\":\"text\",\"text\":\"hello\"}}],\"timestamp\":1771851717843}}}}\n"
343 ),
344 )
345 .expect("write");
346 path
347 }
348
349 #[test]
350 fn resolves_from_sessions_directory() {
351 let temp = tempdir().expect("tempdir");
352 let session_id = "12cb4c19-2774-4de4-a0d0-9fa32fbae29f";
353 let path = write_session(
354 temp.path(),
355 "--Users-xuanwo-Code-xurl--",
356 "2026-02-23T13-00-12-780Z_12cb4c19-2774-4de4-a0d0-9fa32fbae29f.jsonl",
357 session_id,
358 );
359
360 let provider = PiProvider::new(temp.path());
361 let resolved = provider
362 .resolve(session_id)
363 .expect("resolve should succeed");
364
365 assert_eq!(resolved.path, path);
366 assert_eq!(resolved.metadata.source, "pi:sessions");
367 }
368
369 #[test]
370 fn selects_latest_when_multiple_matches_exist() {
371 let temp = tempdir().expect("tempdir");
372 let session_id = "12cb4c19-2774-4de4-a0d0-9fa32fbae29f";
373
374 let first = write_session(
375 temp.path(),
376 "--Users-xuanwo-Code-project-a--",
377 "2026-02-23T13-00-12-780Z_12cb4c19-2774-4de4-a0d0-9fa32fbae29f.jsonl",
378 session_id,
379 );
380 thread::sleep(Duration::from_millis(15));
381 let second = write_session(
382 temp.path(),
383 "--Users-xuanwo-Code-project-b--",
384 "2026-02-23T13-10-12-780Z_12cb4c19-2774-4de4-a0d0-9fa32fbae29f.jsonl",
385 session_id,
386 );
387
388 let provider = PiProvider::new(temp.path());
389 let resolved = provider
390 .resolve(session_id)
391 .expect("resolve should succeed");
392
393 assert_eq!(resolved.path, second);
394 assert_eq!(resolved.metadata.candidate_count, 2);
395 assert_eq!(resolved.metadata.warnings.len(), 1);
396 assert!(resolved.metadata.warnings[0].contains("multiple matches"));
397 assert!(first.exists());
398 }
399
400 #[test]
401 fn missing_thread_returns_not_found() {
402 let temp = tempdir().expect("tempdir");
403 let provider = PiProvider::new(temp.path());
404 let err = provider
405 .resolve("12cb4c19-2774-4de4-a0d0-9fa32fbae29f")
406 .expect_err("must fail");
407 assert!(format!("{err}").contains("thread not found"));
408 }
409}