Skip to main content

pond/adapter/
pi_coding_agent.rs

1//! pi-coding-agent adapter (github.com/badlogic/pi-mono).
2//!
3//! Source path: `~/.pi/agent/sessions/<project-slug>/<ISO-ts>_<uuid>.jsonl`.
4//! One `.jsonl` file per session; each line is a typed record linked via a
5//! `parentId` -> `id` chain (pi-coding-agent's leaf-cursor DAG). Top-level types:
6//! `session` (consumed up front for Session), `model_change` /
7//! `thinking_level_change` / `compaction` (session-state carriers kept as
8//! System messages), and `message` (the per-turn model interaction, with
9//! roles user / assistant / toolResult and content nested under `.message`).
10//!
11//! v1 stores the linear log ordered by source line; pi-coding-agent's `parentId` fork
12//! graph (spec.md#deferred: multi-level fork lineage) is not collapsed into
13//! `parent_message_id` but preserved verbatim in `options.source.raw_record`
14//! for a future branching consumer.
15
16use std::path::{Path, PathBuf};
17
18use chrono::{DateTime, SecondsFormat, Utc};
19use serde_json::{Value, json};
20
21use crate::{
22    sessions::IngestEvent,
23    wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
24};
25
26use super::{
27    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
28    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
29    empty_options,
30    extract::{Extracted, extract_compact_repr, extract_raw_record, extract_str},
31    extracted_text,
32    jsonl::{
33        BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, peek_last_line, source_line,
34    },
35    jsonl_bytes, part_id, part_ordinal, raw_record,
36};
37
38const NAME: &str = "pi-coding-agent";
39
40/// Stateless factory: opens [`PiCodingAgentAdapter`] instances and probes for the
41/// canonical install location under `~/.pi/agent/sessions`.
42pub struct PiCodingAgentFactory;
43
44impl AdapterFactory for PiCodingAgentFactory {
45    fn name(&self) -> &'static str {
46        NAME
47    }
48
49    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
50        Ok(Box::new(PiCodingAgentAdapter::new(config_path(
51            NAME, config,
52        )?)))
53    }
54
55    fn probe_default(&self, env: &Env) -> Option<Value> {
56        let path = env.home.join(".pi").join("agent").join("sessions");
57        path.exists().then(|| json!({ "path": path }))
58    }
59
60    fn serialize(
61        &self,
62        session: &crate::sessions::SessionWithMessages,
63        fidelity: RestoreFidelity,
64    ) -> Result<Vec<RestoredFile>, AdapterError> {
65        serialize_session(session, fidelity)
66    }
67}
68
69fn serialize_session(
70    session: &crate::sessions::SessionWithMessages,
71    fidelity: RestoreFidelity,
72) -> Result<Vec<RestoredFile>, AdapterError> {
73    // Native replays the verbatim `options.source.raw_record` rows (the
74    // `session` line first, then one per message in source order); `pi_record`
75    // below is the foreign-only reconstruction. Replay echoes a frozen
76    // snapshot - safe only while canonical is append-only
77    // (spec.md#adapter-integrity-additive-sync).
78    //
79    // spec.md#adapter-native-restore-lossless: if Native is requested but the
80    // session has no stored `raw_record`, we downgrade to foreign and stamp
81    // `actual_fidelity` so the caller can signal the downgrade. Mirrors
82    // opencode's behavior - both adapters serve the best they can and tell
83    // the truth about what they served.
84    let session_raw = raw_record(&session.session.options);
85    let actual = match fidelity {
86        RestoreFidelity::Native if session_raw.is_some() => RestoreFidelity::Native,
87        _ => RestoreFidelity::Foreign,
88    };
89
90    let mut records = Vec::new();
91    if actual == RestoreFidelity::Native {
92        records.push(session_raw.unwrap_or_else(|| pi_session_record(session)));
93    } else {
94        records.push(pi_session_record(session));
95    }
96
97    // Sort message references rather than cloning the whole vec; restore is a
98    // hot path when users `pond restore` large sessions.
99    let mut messages: Vec<&crate::sessions::MessageWithParts> = session.messages.iter().collect();
100    if actual == RestoreFidelity::Native {
101        messages.sort_by(|left, right| {
102            source_line(left.message.options())
103                .cmp(&source_line(right.message.options()))
104                .then_with(|| by_timestamp_then_id(left, right))
105        });
106    } else {
107        messages.sort_by(|left, right| by_timestamp_then_id(left, right));
108    }
109
110    for message in &messages {
111        if actual == RestoreFidelity::Native
112            && let Some(raw) = raw_record(message.message.options())
113        {
114            records.push(raw);
115            continue;
116        }
117        // Foreign restore: a System carrier (model/thinking/compaction record)
118        // has no idiomatic home in another client's transcript - drop it; the
119        // content stays in canonical (spec.md#adapter-native-restore-lossless,
120        // foreign clause).
121        if matches!(message.message, Message::System { .. }) {
122            continue;
123        }
124        records.push(pi_message_record(message));
125    }
126
127    Ok(vec![RestoredFile::new(
128        pi_relative_path(session),
129        jsonl_bytes(NAME, &records)?,
130        actual,
131    )])
132}
133
134/// Reproduce the on-disk `sessions/<slug>/<file>.jsonl` path from the slug and
135/// file name captured at ingest. Falls back to a derived name when a foreign
136/// session never carried them.
137fn pi_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
138    let source = session.session.options.get("source");
139    let slug = source
140        .and_then(|s| s.get("project_slug"))
141        .and_then(Value::as_str)
142        .map(ToOwned::to_owned)
143        .unwrap_or_else(|| encode_project(&session.session.project));
144    let file_name = source
145        .and_then(|s| s.get("file_name"))
146        .and_then(Value::as_str)
147        .map(ToOwned::to_owned)
148        .unwrap_or_else(|| {
149            let ts = session.session.created_at.format("%Y-%m-%dT%H-%M-%S-%3fZ");
150            format!("{ts}_{}.jsonl", session.session.id)
151        });
152    PathBuf::from("sessions").join(slug).join(file_name)
153}
154
155fn encode_project(project: &str) -> String {
156    project.replace(['/', '.'], "-")
157}
158
159fn pi_session_record(session: &crate::sessions::SessionWithMessages) -> Value {
160    json!({
161        "type": "session",
162        "version": 3,
163        "id": session.session.id,
164        "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
165        "cwd": &*session.session.project,
166    })
167}
168
169fn pi_message_record(message: &crate::sessions::MessageWithParts) -> Value {
170    json!({
171        "type": "message",
172        "id": message.message.id(),
173        "parentId": message.message.options().get("source").and_then(|s| s.get("parent_id")),
174        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
175        "message": pi_inner_message(message),
176    })
177}
178
179fn pi_inner_message(message: &crate::sessions::MessageWithParts) -> Value {
180    let epoch_ms = message.message.timestamp().timestamp_millis();
181    match &message.message {
182        Message::User { .. } => json!({
183            "role": "user",
184            "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
185            "timestamp": epoch_ms,
186        }),
187        Message::Assistant { .. } => json!({
188            "role": "assistant",
189            "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
190            "timestamp": epoch_ms,
191        }),
192        Message::Tool { .. } => {
193            // spec.md#adapter-native-restore-lossless (foreign clause): a
194            // canonical Tool message with no ToolResult part - or with parts
195            // that lack call_id/name - serializes with empty-string slots.
196            // That's lossy by design for foreign restore; the unaltered
197            // source still lives in canonical and in `raw_record`.
198            let part = message.parts.first();
199            let (call_id, name, is_error, result) = match part.map(|p| &p.kind) {
200                Some(PartKind::ToolResult {
201                    call_id,
202                    name,
203                    is_failure,
204                    result,
205                }) => (
206                    extracted_text(call_id).to_owned(),
207                    extracted_text(name).to_owned(),
208                    *is_failure,
209                    result.clone(),
210                ),
211                _ => (String::new(), String::new(), false, Value::Null),
212            };
213            json!({
214                "role": "toolResult",
215                "toolCallId": call_id,
216                "toolName": name,
217                "content": result,
218                "isError": is_error,
219                "timestamp": epoch_ms,
220            })
221        }
222        // serialize_session drops System carriers before reaching here in
223        // foreign mode, and native mode replays the source row verbatim - so
224        // this arm only fires if a caller invokes pi_message_record on a
225        // System message directly. Unreachable on every legitimate path.
226        Message::System { .. } => {
227            unreachable!("System messages are not serialized through pi_inner_message")
228        }
229    }
230}
231
232fn pi_content_item(part: &Part) -> Value {
233    match &part.kind {
234        PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
235        PartKind::Reasoning { text } => json!({
236            "type": "thinking",
237            "thinking": extracted_text(text),
238            "thinkingSignature": part
239                .options
240                .get("pi")
241                .and_then(|p| p.get("thinking_signature")),
242        }),
243        PartKind::ToolCall {
244            call_id,
245            name,
246            params,
247            ..
248        } => json!({
249            "type": "toolCall",
250            "id": extracted_text(call_id),
251            "name": extracted_text(name),
252            "arguments": params,
253        }),
254        other => json!({
255            "type": "text",
256            "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
257        }),
258    }
259}
260
261/// Configured pi-coding-agent reader. Walks a tree of `*.jsonl` files under [`Self::root`]
262/// and yields canonical events in source order per session.
263#[derive(Debug, Clone)]
264pub struct PiCodingAgentAdapter {
265    root: PathBuf,
266}
267
268impl PiCodingAgentAdapter {
269    pub fn new(root: impl Into<PathBuf>) -> Self {
270        Self { root: root.into() }
271    }
272}
273
274impl Adapter for PiCodingAgentAdapter {
275    fn discover(&self) -> DiscoverFuture<'_> {
276        jsonl_tree_discover(self)
277    }
278
279    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
280        jsonl_tree_events(self, oracle)
281    }
282}
283
284impl JsonlTree for PiCodingAgentAdapter {
285    // pi-coding-agent's `toolResult` records carry their own `toolName`, so unlike
286    // claude-code / codex-cli the adapter needs no per-file call_id -> name map.
287    type State = ();
288
289    fn name(&self) -> &'static str {
290        NAME
291    }
292
293    fn root(&self) -> &Path {
294        &self.root
295    }
296
297    fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
298        let row: Value = serde_json::from_str(first_line).ok()?;
299        if row.get("type").and_then(Value::as_str) == Some("session") {
300            row.get("id").and_then(Value::as_str).map(ToOwned::to_owned)
301        } else {
302            None
303        }
304    }
305
306    fn peek_last_ts(&self, path: &Path) -> Option<i64> {
307        // The `session` header is eventless; every other row is a message
308        // carrying its own `timestamp`. The transcript is append-ordered, so the
309        // last line is the latest message; a `session`-only or timestamp-less
310        // tail yields None and the file re-reads (safe).
311        let row: Value = serde_json::from_str(&peek_last_line(path)?).ok()?;
312        if row.get("type").and_then(Value::as_str) == Some("session") {
313            return None;
314        }
315        let text = row.get("timestamp").and_then(Value::as_str)?;
316        Some(
317            DateTime::parse_from_rfc3339(text)
318                .ok()?
319                .with_timezone(&Utc)
320                .timestamp_micros(),
321        )
322    }
323
324    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
325        session_from_rows(path, rows)
326    }
327
328    fn events_from_row(
329        &self,
330        session: &Session,
331        row: &BoundedRow,
332        _state: &mut Self::State,
333    ) -> Result<Vec<IngestEvent>, String> {
334        events_from_row(&session.id, row.line, &row.value, session.created_at)
335    }
336}
337
338fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
339    let path_display = path.display().to_string();
340    let first = rows
341        .first()
342        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
343    let row = &first.value;
344    let at_first = format!("{path_display}:{}", first.line);
345    if row.get("type").and_then(Value::as_str) != Some("session") {
346        return Err(AdapterError::schema(
347            NAME,
348            at_first,
349            "first row must be a `session` record",
350        ));
351    }
352    let id = row
353        .get("id")
354        .and_then(Value::as_str)
355        .ok_or_else(|| AdapterError::schema(NAME, at_first.clone(), "session record missing id"))?
356        .to_owned();
357    let created_at = row
358        .get("timestamp")
359        .and_then(Value::as_str)
360        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
361        .map(|dt| dt.with_timezone(&Utc))
362        .ok_or_else(|| {
363            AdapterError::schema(
364                NAME,
365                at_first.clone(),
366                "session record has no parseable timestamp",
367            )
368        })?;
369    let project = extract_str(row, "cwd").ok_or_else(|| {
370        // spec.md#model-project-non-empty: pi always records `cwd` on the
371        // session line; its absence is a malformed session, not a default.
372        AdapterError::schema(NAME, at_first, "session record missing cwd")
373    })?;
374
375    // Capture the exact on-disk path components so native restore reproduces
376    // the source file byte-for-byte (the slug encoding and filename timestamp
377    // are not recomputable from canonical alone).
378    let project_slug = path
379        .parent()
380        .and_then(|p| p.file_name())
381        .and_then(|n| n.to_str())
382        .map(ToOwned::to_owned);
383    let file_name = path
384        .file_name()
385        .and_then(|n| n.to_str())
386        .map(ToOwned::to_owned);
387
388    let mut options = ProviderOptions::new();
389    options.insert(
390        "source".to_owned(),
391        json!({
392            "adapter": NAME,
393            "version": row.get("version"),
394            "project_slug": project_slug,
395            "file_name": file_name,
396            "raw_record": extract_raw_record(row),
397        }),
398    );
399
400    Ok(Session {
401        id,
402        parent_session_id: None,
403        parent_message_id: None,
404        source_agent: NAME.to_owned(),
405        created_at,
406        project,
407        options,
408    })
409}
410
411/// Map one pi JSONL record into zero-or-more `IngestEvent`s. `session` is
412/// consumed up front (eventless here); `model_change` / `thinking_level_change`
413/// / `compaction` become System carriers; `message` becomes a User / Assistant
414/// / Tool message plus its content Parts.
415fn events_from_row(
416    session_id: &str,
417    line: usize,
418    row: &Value,
419    default_timestamp: DateTime<Utc>,
420) -> Result<Vec<IngestEvent>, String> {
421    let kind = row.get("type").and_then(Value::as_str);
422    let timestamp = row
423        .get("timestamp")
424        .and_then(Value::as_str)
425        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
426        .map(|dt| dt.with_timezone(&Utc))
427        .unwrap_or(default_timestamp);
428    let id = row
429        .get("id")
430        .and_then(Value::as_str)
431        .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
432
433    match kind {
434        // Consumed up front by `session_from_rows`.
435        Some("session") => Ok(Vec::new()),
436        Some("message") => {
437            let message_value = row
438                .get("message")
439                .ok_or_else(|| "message record missing `message` field".to_owned())?;
440            message_events(session_id, &id, timestamp, row, message_value, line)
441        }
442        // Session-state carriers: keep the human-meaningful field as content,
443        // the rest of the record in `options.source.raw_record`.
444        Some("compaction") => Ok(vec![carrier_event(
445            session_id,
446            &id,
447            timestamp,
448            row,
449            line,
450            extract_str(row, "summary"),
451        )]),
452        Some("model_change") | Some("thinking_level_change") => Ok(vec![carrier_event(
453            session_id,
454            &id,
455            timestamp,
456            row,
457            line,
458            extract_str(row, "type"),
459        )]),
460        // Unknown record type: preserve it as a System carrier rather than
461        // dropping (spec.md#adapter-integrity-no-silent-drops). The raw record
462        // survives in options; the type label is the content.
463        _ => Ok(vec![carrier_event(
464            session_id,
465            &id,
466            timestamp,
467            row,
468            line,
469            extract_str(row, "type"),
470        )]),
471    }
472}
473
474fn carrier_event(
475    session_id: &str,
476    id: &str,
477    timestamp: DateTime<Utc>,
478    row: &Value,
479    line: usize,
480    content: Option<Extracted<String>>,
481) -> IngestEvent {
482    IngestEvent::Message(Message::System {
483        id: id.to_owned(),
484        session_id: session_id.to_owned(),
485        timestamp,
486        content,
487        options: row_options(row, line),
488    })
489}
490
491fn message_events(
492    session_id: &str,
493    id: &str,
494    timestamp: DateTime<Utc>,
495    row: &Value,
496    message_value: &Value,
497    line: usize,
498) -> Result<Vec<IngestEvent>, String> {
499    let role = message_value
500        .get("role")
501        .and_then(Value::as_str)
502        .ok_or_else(|| "message missing role".to_owned())?;
503    let content = message_value
504        .get("content")
505        .and_then(Value::as_array)
506        .cloned()
507        .unwrap_or_default();
508
509    let mut parts = Vec::new();
510    let message = match role {
511        "user" => {
512            // spec.md#model-part-provenance: pi user messages are genuine human
513            // prompts; harness-injected context arrives as separate records
514            // (compaction, model_change), not inside a user turn.
515            for (ordinal, item) in content.iter().enumerate() {
516                parts.push(user_part(session_id, id, ordinal, item));
517            }
518            Message::User {
519                id: id.to_owned(),
520                session_id: session_id.to_owned(),
521                timestamp,
522                options: row_options(row, line),
523            }
524        }
525        "assistant" => {
526            for (ordinal, item) in content.iter().enumerate() {
527                parts.push(assistant_part(session_id, id, ordinal, item));
528            }
529            Message::Assistant {
530                id: id.to_owned(),
531                session_id: session_id.to_owned(),
532                timestamp,
533                options: assistant_options(row, message_value, line),
534            }
535        }
536        "toolResult" => {
537            parts.push(tool_result_part(session_id, id, message_value));
538            Message::Tool {
539                id: id.to_owned(),
540                session_id: session_id.to_owned(),
541                timestamp,
542                options: row_options(row, line),
543            }
544        }
545        // Unknown nested roles are still parseable source records. Preserve the
546        // row as a System carrier instead of turning it into a counted drop.
547        _ => Message::System {
548            id: id.to_owned(),
549            session_id: session_id.to_owned(),
550            timestamp,
551            content: extract_str(message_value, "role"),
552            options: row_options(row, line),
553        },
554    };
555
556    let mut events = Vec::with_capacity(parts.len() + 1);
557    events.push(IngestEvent::Message(message));
558    events.extend(parts.into_iter().map(IngestEvent::Part));
559    Ok(events)
560}
561
562fn user_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
563    let kind = match item.get("type").and_then(Value::as_str) {
564        Some("text") => PartKind::Text {
565            text: extract_str(item, "text"),
566        },
567        // Anything else (e.g. an `image` content item) is preserved losslessly
568        // as a compact-JSON Text Part rather than dropped.
569        _ => PartKind::Text {
570            text: Some(extract_compact_repr(item)),
571        },
572    };
573    Part {
574        session_id: session_id.to_owned(),
575        id: part_id(message_id, ordinal),
576        message_id: message_id.to_owned(),
577        ordinal: part_ordinal(ordinal),
578        // spec.md#model-part-provenance: a genuine human prompt is conversation.
579        provenance: Provenance::Conversational,
580        options: empty_options(),
581        kind,
582    }
583}
584
585fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
586    // spec.md#model-part-provenance: assistant text, reasoning, and tool calls
587    // are model-authored, hence conversational.
588    let (kind, options) = match item.get("type").and_then(Value::as_str) {
589        Some("text") => (
590            PartKind::Text {
591                text: extract_str(item, "text"),
592            },
593            empty_options(),
594        ),
595        Some("thinking") => (
596            PartKind::Reasoning {
597                text: extract_str(item, "thinking"),
598            },
599            thinking_options(item),
600        ),
601        Some("toolCall") => (
602            PartKind::ToolCall {
603                call_id: extract_str(item, "id"),
604                name: extract_str(item, "name"),
605                params: item.get("arguments").cloned().unwrap_or(Value::Null),
606                provider_executed: false,
607            },
608            empty_options(),
609        ),
610        // Lossless fallback for an unrecognised assistant content shape.
611        _ => (
612            PartKind::Text {
613                text: Some(extract_compact_repr(item)),
614            },
615            empty_options(),
616        ),
617    };
618    Part {
619        session_id: session_id.to_owned(),
620        id: part_id(message_id, ordinal),
621        message_id: message_id.to_owned(),
622        ordinal: part_ordinal(ordinal),
623        provenance: Provenance::Conversational,
624        options,
625        kind,
626    }
627}
628
629fn tool_result_part(session_id: &str, message_id: &str, message_value: &Value) -> Part {
630    Part {
631        session_id: session_id.to_owned(),
632        id: part_id(message_id, 0),
633        message_id: message_id.to_owned(),
634        ordinal: 0,
635        // spec.md#model-part-provenance: tool output is runtime-produced.
636        provenance: Provenance::Injected,
637        options: empty_options(),
638        kind: PartKind::ToolResult {
639            call_id: extract_str(message_value, "toolCallId"),
640            name: extract_str(message_value, "toolName"),
641            is_failure: message_value
642                .get("isError")
643                .and_then(Value::as_bool)
644                .unwrap_or(false),
645            // The whole `content` array (text and/or image items) is the
646            // faithful result payload.
647            result: message_value.get("content").cloned().unwrap_or(Value::Null),
648        },
649    }
650}
651
652fn row_options(row: &Value, line: usize) -> ProviderOptions {
653    let mut options = ProviderOptions::new();
654    options.insert(
655        "source".to_owned(),
656        json!({
657            "line": line,
658            "parent_id": row.get("parentId"),
659            "raw_type": row.get("type"),
660            "raw_record": extract_raw_record(row),
661        }),
662    );
663    options
664}
665
666fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
667    let mut options = row_options(row, line);
668    options.insert(
669        "pi".to_owned(),
670        json!({
671            "api": message_value.get("api"),
672            "provider": message_value.get("provider"),
673            "model": message_value.get("model"),
674            "usage": message_value.get("usage"),
675            "stop_reason": message_value.get("stopReason"),
676            "response_id": message_value.get("responseId"),
677        }),
678    );
679    options
680}
681
682fn thinking_options(item: &Value) -> ProviderOptions {
683    let mut options = ProviderOptions::new();
684    if let Some(signature) = item.get("thinkingSignature") {
685        options.insert("pi".to_owned(), json!({ "thinking_signature": signature }));
686    }
687    options
688}
689
690#[cfg(test)]
691mod tests {
692    //! End-to-end test for the pi-coding-agent adapter: ingest the committed fixture corpus
693    //! and assert pond's canonical Session/Message/Part shape comes out the
694    //! other side. The fixture lives under `tests/fixtures/adapter/pi-coding-agent/`.
695    #![allow(clippy::expect_used, clippy::unwrap_used)]
696
697    use super::*;
698    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
699    use tempfile::TempDir;
700
701    // Manifest-dir anchored: unit tests must not depend on the process cwd
702    // (figment::Jail chdirs the whole test process while config tests run).
703    const FIXTURES: &str = concat!(
704        env!("CARGO_MANIFEST_DIR"),
705        "/tests/fixtures/adapter/pi-coding-agent/sessions"
706    );
707
708    #[test]
709    fn probe_default_finds_pi_sessions_under_home() -> anyhow::Result<()> {
710        crate::adapter::test_support::assert_probe_default(
711            &PiCodingAgentFactory,
712            &[".pi", "agent", "sessions"],
713        )
714    }
715
716    #[tokio::test(flavor = "multi_thread")]
717    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
718        let adapter = PiCodingAgentAdapter::new(FIXTURES);
719        crate::adapter::test_support::assert_native_restore(
720            &PiCodingAgentFactory,
721            &adapter,
722            // pi-coding-agent relative paths embed the `sessions/` segment, so the corpus
723            // root is FIXTURES' parent, not FIXTURES itself.
724            std::path::Path::new(FIXTURES)
725                .parent()
726                .expect("FIXTURES is nested under a corpus root"),
727        )
728        .await
729    }
730
731    #[tokio::test(flavor = "multi_thread")]
732    async fn pi_coding_agent_adapter_ingests_fixture_corpus_into_canonical_shape()
733    -> anyhow::Result<()> {
734        let temp = TempDir::new()?;
735        let store = Store::open_local(temp.path()).await?;
736        let adapter = PiCodingAgentAdapter::new(FIXTURES);
737
738        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
739        assert!(summary.accepted() > 0, "ingest must accept rows");
740        assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
741        assert_eq!(
742            summary.dropped_sessions, 0,
743            "no session-level rejections expected"
744        );
745        assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
746
747        let (sessions, messages, parts) = store.row_counts().await?;
748        assert!(sessions > 0, "at least one pi-coding-agent session");
749        assert!(messages > 0, "at least one pi-coding-agent message");
750        assert!(parts > 0, "at least one pi-coding-agent Part");
751
752        let mut saw_tool_call = false;
753        let mut saw_tool_result = false;
754        let mut saw_reasoning = false;
755        for session_id in store.session_ids().await? {
756            let session = store
757                .get_session(&session_id)
758                .await?
759                .expect("session round-trips");
760            assert_eq!(session.session.source_agent, NAME);
761            assert!(
762                !(*session.session.project).is_empty(),
763                "spec.md#model-project-non-empty: project must be a real cwd",
764            );
765            for stored in &session.messages {
766                for part in &stored.parts {
767                    match &part.kind {
768                        PartKind::ToolCall { .. } => saw_tool_call = true,
769                        PartKind::ToolResult { .. } => saw_tool_result = true,
770                        PartKind::Reasoning { .. } => saw_reasoning = true,
771                        _ => {}
772                    }
773                }
774            }
775        }
776        assert!(saw_tool_call, "corpus has assistant tool calls");
777        assert!(saw_tool_result, "corpus has tool results");
778        assert!(saw_reasoning, "corpus has assistant reasoning");
779        Ok(())
780    }
781
782    #[test]
783    fn unknown_nested_message_role_becomes_system_carrier() -> anyhow::Result<()> {
784        let row = json!({
785            "type": "message",
786            "id": "mystery-message",
787            "message": {
788                "role": "mysteryRole",
789                "content": [{"type": "text", "text": "not yet understood"}]
790            }
791        });
792        let events = events_from_row(
793            "session-1",
794            42,
795            &row,
796            DateTime::parse_from_rfc3339("2026-04-28T18:47:32.280Z")?.with_timezone(&Utc),
797        )
798        .map_err(anyhow::Error::msg)?;
799
800        assert_eq!(events.len(), 1);
801        let IngestEvent::Message(Message::System {
802            id,
803            content,
804            options,
805            ..
806        }) = &events[0]
807        else {
808            panic!("unknown role must produce a System carrier");
809        };
810        assert_eq!(id, "mystery-message");
811        assert_eq!(content.as_deref().map(String::as_str), Some("mysteryRole"));
812        assert_eq!(
813            raw_record(options)
814                .and_then(|raw| raw.get("message").cloned())
815                .and_then(|message| message.get("role").cloned()),
816            Some(json!("mysteryRole")),
817        );
818        Ok(())
819    }
820
821    #[tokio::test(flavor = "multi_thread")]
822    async fn fork_parent_ids_and_compaction_summary_are_preserved() -> anyhow::Result<()> {
823        let temp = TempDir::new()?;
824        let root = temp.path().join("sessions");
825        let path = root
826            .join("project")
827            .join("2026-05-01T00-00-00-000Z_fork.jsonl");
828        write_jsonl_file(
829            &path,
830            &[
831                json!({
832                    "type": "session",
833                    "version": 3,
834                    "id": "pi-fork-session",
835                    "timestamp": "2026-05-01T00:00:00.000Z",
836                    "cwd": "/tmp/pi-fork",
837                }),
838                json!({
839                    "type": "message",
840                    "id": "parent-message",
841                    "timestamp": "2026-05-01T00:00:01.000Z",
842                    "message": {
843                        "role": "user",
844                        "content": [{"type": "text", "text": "parent"}],
845                    },
846                }),
847                json!({
848                    "type": "message",
849                    "id": "child-a",
850                    "parentId": "parent-message",
851                    "timestamp": "2026-05-01T00:00:02.000Z",
852                    "message": {
853                        "role": "assistant",
854                        "content": [{"type": "text", "text": "branch a"}],
855                    },
856                }),
857                json!({
858                    "type": "message",
859                    "id": "child-b",
860                    "parentId": "parent-message",
861                    "timestamp": "2026-05-01T00:00:03.000Z",
862                    "message": {
863                        "role": "assistant",
864                        "content": [{"type": "text", "text": "branch b"}],
865                    },
866                }),
867                json!({
868                    "type": "compaction",
869                    "id": "compact-1",
870                    "parentId": "child-b",
871                    "timestamp": "2026-05-01T00:00:04.000Z",
872                    "summary": "compact summary",
873                }),
874            ],
875        )?;
876
877        let store = Store::open_local(temp.path().join("store")).await?;
878        let summary = ingest_adapter(
879            &store,
880            &PiCodingAgentAdapter::new(&root),
881            &crate::adapter::NoopOracle,
882            |_| {},
883        )
884        .await?;
885        assert_eq!(summary.dropped_events, 0);
886
887        let session = store
888            .get_session("pi-fork-session")
889            .await?
890            .expect("fixture session lands");
891        let child_a = session
892            .messages
893            .iter()
894            .find(|stored| stored.message.id() == "child-a")
895            .expect("first fork child lands");
896        let child_b = session
897            .messages
898            .iter()
899            .find(|stored| stored.message.id() == "child-b")
900            .expect("second fork child lands");
901        for child in [child_a, child_b] {
902            assert_eq!(
903                child
904                    .message
905                    .options()
906                    .get("source")
907                    .and_then(|source| source.get("parent_id"))
908                    .and_then(Value::as_str),
909                Some("parent-message"),
910            );
911        }
912        assert!(source_line(child_a.message.options()) < source_line(child_b.message.options()));
913
914        let compact = session
915            .messages
916            .iter()
917            .find(|stored| stored.message.id() == "compact-1")
918            .expect("compaction carrier lands");
919        let Message::System { content, .. } = &compact.message else {
920            panic!("compaction is preserved as a System carrier");
921        };
922        assert_eq!(
923            content.as_deref().map(String::as_str),
924            Some("compact summary")
925        );
926        Ok(())
927    }
928
929    #[tokio::test(flavor = "multi_thread")]
930    async fn foreign_serialization_reparses_as_pi_coding_agent() -> anyhow::Result<()> {
931        let temp = TempDir::new()?;
932        let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
933        let origin = crate::adapter::OpencodeAdapter::new(concat!(
934            env!("CARGO_MANIFEST_DIR"),
935            "/tests/fixtures/adapter/opencode/storage"
936        ));
937        ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
938        let session_id = origin_store
939            .session_ids()
940            .await?
941            .into_iter()
942            .next()
943            .expect("opencode fixture has sessions");
944        let session = origin_store
945            .get_session(&session_id)
946            .await?
947            .expect("fixture session is readable");
948
949        let restored_root = temp.path().join("pi-corpus");
950        crate::adapter::write_restored_files(
951            &restored_root,
952            PiCodingAgentFactory.serialize(&session, RestoreFidelity::Foreign)?,
953        )?;
954        let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
955        let summary = ingest_adapter(
956            &restored_store,
957            &PiCodingAgentAdapter::new(restored_root.join("sessions")),
958            &crate::adapter::NoopOracle,
959            |_| {},
960        )
961        .await?;
962
963        assert!(summary.accepted() > 0);
964        assert_eq!(summary.dropped_events, 0);
965        Ok(())
966    }
967
968    /// spec.md#model-part-provenance: a tool result is harness-injected; an
969    /// assistant turn's text/reasoning/tool-call parts are conversation.
970    #[tokio::test(flavor = "multi_thread")]
971    async fn tool_results_are_injected_assistant_parts_are_conversational() -> anyhow::Result<()> {
972        let temp = TempDir::new()?;
973        let store = Store::open_local(temp.path()).await?;
974        let adapter = PiCodingAgentAdapter::new(FIXTURES);
975        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
976
977        for session_id in store.session_ids().await? {
978            let session = store
979                .get_session(&session_id)
980                .await?
981                .expect("session round-trips");
982            for stored in &session.messages {
983                for part in &stored.parts {
984                    match &part.kind {
985                        PartKind::ToolResult { .. } => {
986                            assert_eq!(part.provenance, Provenance::Injected);
987                        }
988                        PartKind::ToolCall { .. } | PartKind::Reasoning { .. } => {
989                            assert_eq!(part.provenance, Provenance::Conversational);
990                        }
991                        _ => {}
992                    }
993                }
994            }
995        }
996        Ok(())
997    }
998
999    fn write_jsonl_file(path: &std::path::Path, records: &[Value]) -> anyhow::Result<()> {
1000        if let Some(parent) = path.parent() {
1001            std::fs::create_dir_all(parent)?;
1002        }
1003        std::fs::write(path, jsonl_bytes(NAME, records)?)?;
1004        Ok(())
1005    }
1006}