Skip to main content

xurl_core/provider/
pi.rs

1use std::cmp::Reverse;
2use std::fs;
3use std::io::{BufRead, BufReader, Read};
4use std::path::{Path, PathBuf};
5use std::process::{Command, Stdio};
6use std::time::SystemTime;
7
8use serde_json::Value;
9use walkdir::WalkDir;
10
11use crate::error::{Result, XurlError};
12use crate::jsonl;
13use crate::model::{ProviderKind, ResolutionMeta, ResolvedThread, WriteRequest, WriteResult};
14use crate::provider::{Provider, WriteEventSink, append_passthrough_args};
15
16#[derive(Debug, Clone)]
17pub struct PiProvider {
18    root: PathBuf,
19}
20
21impl PiProvider {
22    pub fn new(root: impl Into<PathBuf>) -> Self {
23        Self { root: root.into() }
24    }
25
26    fn sessions_root(&self) -> PathBuf {
27        self.root.join("sessions")
28    }
29
30    fn has_session_id(path: &Path, session_id: &str) -> bool {
31        let file = match fs::File::open(path) {
32            Ok(file) => file,
33            Err(_) => return false,
34        };
35        let reader = BufReader::new(file);
36
37        let Some(first_non_empty) = reader
38            .lines()
39            .take(20)
40            .filter_map(std::result::Result::ok)
41            .find(|line| !line.trim().is_empty())
42        else {
43            return false;
44        };
45
46        let Ok(header) = serde_json::from_str::<Value>(&first_non_empty) else {
47            return false;
48        };
49
50        header.get("type").and_then(Value::as_str) == Some("session")
51            && header
52                .get("id")
53                .and_then(Value::as_str)
54                .is_some_and(|id| id.eq_ignore_ascii_case(session_id))
55    }
56
57    fn find_candidates(sessions_root: &Path, session_id: &str) -> Vec<PathBuf> {
58        if !sessions_root.exists() {
59            return Vec::new();
60        }
61
62        WalkDir::new(sessions_root)
63            .into_iter()
64            .filter_map(std::result::Result::ok)
65            .filter(|entry| entry.file_type().is_file())
66            .map(|entry| entry.into_path())
67            .filter(|path| {
68                path.extension()
69                    .and_then(|ext| ext.to_str())
70                    .is_some_and(|ext| ext == "jsonl")
71            })
72            .filter(|path| Self::has_session_id(path, session_id))
73            .collect()
74    }
75
76    fn choose_latest(paths: Vec<PathBuf>) -> Option<(PathBuf, usize)> {
77        if paths.is_empty() {
78            return None;
79        }
80
81        let mut scored = paths
82            .into_iter()
83            .map(|path| {
84                let modified = fs::metadata(&path)
85                    .and_then(|meta| meta.modified())
86                    .unwrap_or(SystemTime::UNIX_EPOCH);
87                (path, modified)
88            })
89            .collect::<Vec<_>>();
90        scored.sort_by_key(|(_, modified)| Reverse(*modified));
91        let count = scored.len();
92        scored.into_iter().next().map(|(path, _)| (path, count))
93    }
94
95    fn pi_bin() -> String {
96        std::env::var("XURL_PI_BIN").unwrap_or_else(|_| "pi".to_string())
97    }
98
99    fn spawn_pi_command(args: &[String]) -> Result<std::process::Child> {
100        let bin = Self::pi_bin();
101        let mut command = Command::new(&bin);
102        command
103            .args(args)
104            .stdin(Stdio::null())
105            .stdout(Stdio::piped())
106            .stderr(Stdio::piped());
107        command.spawn().map_err(|source| {
108            if source.kind() == std::io::ErrorKind::NotFound {
109                XurlError::CommandNotFound { command: bin }
110            } else {
111                XurlError::Io {
112                    path: PathBuf::from(bin),
113                    source,
114                }
115            }
116        })
117    }
118
119    fn extract_assistant_text(message: &Value) -> Option<String> {
120        if message.get("role").and_then(Value::as_str) != Some("assistant") {
121            return None;
122        }
123
124        if let Some(content) = message.get("content").and_then(Value::as_str) {
125            if content.is_empty() {
126                return None;
127            }
128            return Some(content.to_string());
129        }
130
131        let content = message.get("content")?.as_array()?;
132        let text = content
133            .iter()
134            .filter_map(|item| {
135                if item.get("type").and_then(Value::as_str) == Some("text") {
136                    item.get("text").and_then(Value::as_str)
137                } else {
138                    None
139                }
140            })
141            .collect::<Vec<_>>()
142            .join("");
143        if text.is_empty() { None } else { Some(text) }
144    }
145
146    fn run_write(
147        &self,
148        args: &[String],
149        req: &WriteRequest,
150        sink: &mut dyn WriteEventSink,
151        warnings: Vec<String>,
152    ) -> Result<WriteResult> {
153        let mut child = Self::spawn_pi_command(args)?;
154        let stdout = child
155            .stdout
156            .take()
157            .ok_or_else(|| XurlError::WriteProtocol("pi stdout pipe is unavailable".to_string()))?;
158        let stderr = child
159            .stderr
160            .take()
161            .ok_or_else(|| XurlError::WriteProtocol("pi stderr pipe is unavailable".to_string()))?;
162        let stderr_handle = std::thread::spawn(move || {
163            let mut reader = BufReader::new(stderr);
164            let mut content = String::new();
165            let _ = reader.read_to_string(&mut content);
166            content
167        });
168
169        let mut session_id = req.session_id.clone();
170        let mut final_text = None::<String>;
171        let mut streamed_text = String::new();
172        let mut streamed_delta = false;
173        let stream_path = Path::new("<pi:stdout>");
174        let reader = BufReader::new(stdout);
175        jsonl::parse_jsonl_reader(stream_path, reader, |_, value| {
176            let Some(event_type) = value.get("type").and_then(Value::as_str) else {
177                return Ok(());
178            };
179
180            match event_type {
181                "session" => {
182                    if let Some(current_session_id) = value.get("id").and_then(Value::as_str)
183                        && session_id.as_deref() != Some(current_session_id)
184                    {
185                        sink.on_session_ready(ProviderKind::Pi, current_session_id)?;
186                        session_id = Some(current_session_id.to_string());
187                    }
188                }
189                "message_update" => {
190                    if value
191                        .get("assistantMessageEvent")
192                        .and_then(Value::as_object)
193                        .and_then(|event| event.get("type"))
194                        .and_then(Value::as_str)
195                        == Some("text_delta")
196                        && let Some(delta) = value
197                            .get("assistantMessageEvent")
198                            .and_then(Value::as_object)
199                            .and_then(|event| event.get("delta"))
200                            .and_then(Value::as_str)
201                        && !delta.is_empty()
202                    {
203                        sink.on_text_delta(delta)?;
204                        streamed_text.push_str(delta);
205                        final_text = Some(streamed_text.clone());
206                        streamed_delta = true;
207                    }
208                }
209                "message_end" | "turn_end" => {
210                    if streamed_delta {
211                        return Ok(());
212                    }
213                    if let Some(text) = value
214                        .get("message")
215                        .and_then(Self::extract_assistant_text)
216                        .filter(|text| !text.is_empty())
217                    {
218                        sink.on_text_delta(&text)?;
219                        final_text = Some(text);
220                    }
221                }
222                _ => {}
223            }
224            Ok(())
225        })?;
226
227        let status = child.wait().map_err(|source| XurlError::Io {
228            path: PathBuf::from(Self::pi_bin()),
229            source,
230        })?;
231        let stderr_content = stderr_handle.join().unwrap_or_default();
232        if !status.success() {
233            return Err(XurlError::CommandFailed {
234                command: format!("{} {}", Self::pi_bin(), args.join(" ")),
235                code: status.code(),
236                stderr: stderr_content.trim().to_string(),
237            });
238        }
239
240        let session_id = if let Some(session_id) = session_id {
241            session_id
242        } else {
243            return Err(XurlError::WriteProtocol(
244                "missing session id in pi event stream".to_string(),
245            ));
246        };
247
248        Ok(WriteResult {
249            provider: ProviderKind::Pi,
250            session_id,
251            final_text,
252            warnings,
253        })
254    }
255}
256
257impl Provider for PiProvider {
258    fn kind(&self) -> ProviderKind {
259        ProviderKind::Pi
260    }
261
262    fn resolve(&self, session_id: &str) -> Result<ResolvedThread> {
263        let sessions_root = self.sessions_root();
264        let candidates = Self::find_candidates(&sessions_root, session_id);
265
266        if let Some((selected, count)) = Self::choose_latest(candidates) {
267            let mut metadata = ResolutionMeta {
268                source: "pi:sessions".to_string(),
269                candidate_count: count,
270                warnings: Vec::new(),
271            };
272
273            if count > 1 {
274                metadata.warnings.push(format!(
275                    "multiple matches found ({count}) for session_id={session_id}; selected latest: {}",
276                    selected.display()
277                ));
278            }
279
280            return Ok(ResolvedThread {
281                provider: ProviderKind::Pi,
282                session_id: session_id.to_string(),
283                path: selected,
284                metadata,
285            });
286        }
287
288        Err(XurlError::ThreadNotFound {
289            provider: ProviderKind::Pi.to_string(),
290            session_id: session_id.to_string(),
291            searched_roots: vec![sessions_root],
292        })
293    }
294
295    fn write(&self, req: &WriteRequest, sink: &mut dyn WriteEventSink) -> Result<WriteResult> {
296        if let Some(role) = req.options.role.as_deref() {
297            return Err(XurlError::InvalidMode(format!(
298                "provider `{}` does not support role-based write URI (`{role}`)",
299                ProviderKind::Pi
300            )));
301        }
302        let warnings = Vec::new();
303        let mut args = Vec::new();
304        if let Some(session_id) = req.session_id.as_deref() {
305            let resolved = self.resolve(session_id)?;
306            let session_path = resolved.path.to_string_lossy().to_string();
307            args.push("--session".to_string());
308            args.push(session_path);
309            args.push("-p".to_string());
310            args.push(req.prompt.clone());
311            args.push("--mode".to_string());
312            args.push("json".to_string());
313        } else {
314            args.push("-p".to_string());
315            args.push(req.prompt.clone());
316            args.push("--mode".to_string());
317            args.push("json".to_string());
318        }
319        append_passthrough_args(&mut args, &req.options.params);
320        self.run_write(&args, req, sink, warnings)
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use std::fs;
327    use std::path::{Path, PathBuf};
328    use std::thread;
329    use std::time::Duration;
330
331    use tempfile::tempdir;
332
333    use crate::provider::Provider;
334    use crate::provider::pi::PiProvider;
335
336    fn write_session(root: &Path, session_dir: &str, file_name: &str, session_id: &str) -> PathBuf {
337        let path = root.join("sessions").join(session_dir).join(file_name);
338        fs::create_dir_all(path.parent().expect("parent")).expect("mkdir");
339        fs::write(
340            &path,
341            format!(
342                "{{\"type\":\"session\",\"version\":3,\"id\":\"{session_id}\",\"timestamp\":\"2026-02-23T13:00:12.780Z\",\"cwd\":\"/tmp/project\"}}\n{{\"type\":\"message\",\"id\":\"a1b2c3d4\",\"parentId\":null,\"timestamp\":\"2026-02-23T13:00:13.000Z\",\"message\":{{\"role\":\"user\",\"content\":[{{\"type\":\"text\",\"text\":\"hello\"}}],\"timestamp\":1771851717843}}}}\n"
343            ),
344        )
345        .expect("write");
346        path
347    }
348
349    #[test]
350    fn resolves_from_sessions_directory() {
351        let temp = tempdir().expect("tempdir");
352        let session_id = "12cb4c19-2774-4de4-a0d0-9fa32fbae29f";
353        let path = write_session(
354            temp.path(),
355            "--Users-xuanwo-Code-xurl--",
356            "2026-02-23T13-00-12-780Z_12cb4c19-2774-4de4-a0d0-9fa32fbae29f.jsonl",
357            session_id,
358        );
359
360        let provider = PiProvider::new(temp.path());
361        let resolved = provider
362            .resolve(session_id)
363            .expect("resolve should succeed");
364
365        assert_eq!(resolved.path, path);
366        assert_eq!(resolved.metadata.source, "pi:sessions");
367    }
368
369    #[test]
370    fn selects_latest_when_multiple_matches_exist() {
371        let temp = tempdir().expect("tempdir");
372        let session_id = "12cb4c19-2774-4de4-a0d0-9fa32fbae29f";
373
374        let first = write_session(
375            temp.path(),
376            "--Users-xuanwo-Code-project-a--",
377            "2026-02-23T13-00-12-780Z_12cb4c19-2774-4de4-a0d0-9fa32fbae29f.jsonl",
378            session_id,
379        );
380        thread::sleep(Duration::from_millis(15));
381        let second = write_session(
382            temp.path(),
383            "--Users-xuanwo-Code-project-b--",
384            "2026-02-23T13-10-12-780Z_12cb4c19-2774-4de4-a0d0-9fa32fbae29f.jsonl",
385            session_id,
386        );
387
388        let provider = PiProvider::new(temp.path());
389        let resolved = provider
390            .resolve(session_id)
391            .expect("resolve should succeed");
392
393        assert_eq!(resolved.path, second);
394        assert_eq!(resolved.metadata.candidate_count, 2);
395        assert_eq!(resolved.metadata.warnings.len(), 1);
396        assert!(resolved.metadata.warnings[0].contains("multiple matches"));
397        assert!(first.exists());
398    }
399
400    #[test]
401    fn missing_thread_returns_not_found() {
402        let temp = tempdir().expect("tempdir");
403        let provider = PiProvider::new(temp.path());
404        let err = provider
405            .resolve("12cb4c19-2774-4de4-a0d0-9fa32fbae29f")
406            .expect_err("must fail");
407        assert!(format!("{err}").contains("thread not found"));
408    }
409}