Skip to main content

pond/adapter/
pi_coding_agent.rs

1//! pi-coding-agent adapter (github.com/badlogic/pi-mono).
2//!
3//! Source path: `~/.pi/agent/sessions/<project-slug>/<ISO-ts>_<uuid>.jsonl`.
4//! One `.jsonl` file per session; each line is a typed record linked via a
5//! `parentId` -> `id` chain (pi-coding-agent's leaf-cursor DAG). Top-level types:
6//! `session` (consumed up front for Session), `model_change` /
7//! `thinking_level_change` / `compaction` (session-state carriers kept as
8//! System messages), and `message` (the per-turn model interaction, with
9//! roles user / assistant / toolResult and content nested under `.message`).
10//!
11//! v1 stores the linear log ordered by source line; pi-coding-agent's `parentId` fork
12//! graph (spec.md#deferred: multi-level fork lineage) is not collapsed into
13//! `parent_message_id` but preserved verbatim in `options.source.raw_record`
14//! for a future branching consumer.
15
16use std::path::{Path, PathBuf};
17
18use chrono::{DateTime, SecondsFormat, Utc};
19use serde_json::{Value, json};
20
21use crate::{
22    sessions::IngestEvent,
23    wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
24};
25
26use super::{
27    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
28    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
29    empty_options,
30    extract::{Extracted, extract_compact_repr, extract_raw_record, extract_str},
31    extracted_text,
32    jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
33    jsonl_bytes, part_id, part_ordinal, raw_record,
34};
35
36const NAME: &str = "pi-coding-agent";
37
38/// Stateless factory: opens [`PiCodingAgentAdapter`] instances and probes for the
39/// canonical install location under `~/.pi/agent/sessions`.
40pub struct PiCodingAgentFactory;
41
42impl AdapterFactory for PiCodingAgentFactory {
43    fn name(&self) -> &'static str {
44        NAME
45    }
46
47    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
48        Ok(Box::new(PiCodingAgentAdapter::new(config_path(
49            NAME, config,
50        )?)))
51    }
52
53    fn probe_default(&self, env: &Env) -> Option<Value> {
54        let path = env.home.join(".pi").join("agent").join("sessions");
55        path.exists().then(|| json!({ "path": path }))
56    }
57
58    fn serialize(
59        &self,
60        session: &crate::sessions::SessionWithMessages,
61        fidelity: RestoreFidelity,
62    ) -> Result<Vec<RestoredFile>, AdapterError> {
63        serialize_session(session, fidelity)
64    }
65}
66
67fn serialize_session(
68    session: &crate::sessions::SessionWithMessages,
69    fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71    // Native replays the verbatim `options.source.raw_record` rows (the
72    // `session` line first, then one per message in source order); `pi_record`
73    // below is the foreign-only reconstruction. Replay echoes a frozen
74    // snapshot - safe only while canonical is append-only
75    // (spec.md#adapter-integrity-additive-sync).
76    //
77    // spec.md#adapter-native-restore-lossless: if Native is requested but the
78    // session has no stored `raw_record`, we downgrade to foreign and stamp
79    // `actual_fidelity` so the caller can signal the downgrade. Mirrors
80    // opencode's behavior - both adapters serve the best they can and tell
81    // the truth about what they served.
82    let session_raw = raw_record(&session.session.options);
83    let actual = match fidelity {
84        RestoreFidelity::Native if session_raw.is_some() => RestoreFidelity::Native,
85        _ => RestoreFidelity::Foreign,
86    };
87
88    let mut records = Vec::new();
89    if actual == RestoreFidelity::Native {
90        records.push(session_raw.unwrap_or_else(|| pi_session_record(session)));
91    } else {
92        records.push(pi_session_record(session));
93    }
94
95    // Sort message references rather than cloning the whole vec; restore is a
96    // hot path when users `pond restore` large sessions.
97    let mut messages: Vec<&crate::sessions::MessageWithParts> = session.messages.iter().collect();
98    if actual == RestoreFidelity::Native {
99        messages.sort_by(|left, right| {
100            source_line(left.message.options())
101                .cmp(&source_line(right.message.options()))
102                .then_with(|| by_timestamp_then_id(left, right))
103        });
104    } else {
105        messages.sort_by(|left, right| by_timestamp_then_id(left, right));
106    }
107
108    for message in &messages {
109        if actual == RestoreFidelity::Native
110            && let Some(raw) = raw_record(message.message.options())
111        {
112            records.push(raw);
113            continue;
114        }
115        // Foreign restore: a System carrier (model/thinking/compaction record)
116        // has no idiomatic home in another client's transcript - drop it; the
117        // content stays in canonical (spec.md#adapter-native-restore-lossless,
118        // foreign clause).
119        if matches!(message.message, Message::System { .. }) {
120            continue;
121        }
122        records.push(pi_message_record(message));
123    }
124
125    Ok(vec![RestoredFile::new(
126        pi_relative_path(session),
127        jsonl_bytes(NAME, &records)?,
128        actual,
129    )])
130}
131
132/// Reproduce the on-disk `sessions/<slug>/<file>.jsonl` path from the slug and
133/// file name captured at ingest. Falls back to a derived name when a foreign
134/// session never carried them.
135fn pi_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
136    let source = session.session.options.get("source");
137    let slug = source
138        .and_then(|s| s.get("project_slug"))
139        .and_then(Value::as_str)
140        .map(ToOwned::to_owned)
141        .unwrap_or_else(|| encode_project(&session.session.project));
142    let file_name = source
143        .and_then(|s| s.get("file_name"))
144        .and_then(Value::as_str)
145        .map(ToOwned::to_owned)
146        .unwrap_or_else(|| {
147            let ts = session.session.created_at.format("%Y-%m-%dT%H-%M-%S-%3fZ");
148            format!("{ts}_{}.jsonl", session.session.id)
149        });
150    PathBuf::from("sessions").join(slug).join(file_name)
151}
152
153fn encode_project(project: &str) -> String {
154    project.replace(['/', '.'], "-")
155}
156
157fn pi_session_record(session: &crate::sessions::SessionWithMessages) -> Value {
158    json!({
159        "type": "session",
160        "version": 3,
161        "id": session.session.id,
162        "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
163        "cwd": &*session.session.project,
164    })
165}
166
167fn pi_message_record(message: &crate::sessions::MessageWithParts) -> Value {
168    json!({
169        "type": "message",
170        "id": message.message.id(),
171        "parentId": message.message.options().get("source").and_then(|s| s.get("parent_id")),
172        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
173        "message": pi_inner_message(message),
174    })
175}
176
177fn pi_inner_message(message: &crate::sessions::MessageWithParts) -> Value {
178    let epoch_ms = message.message.timestamp().timestamp_millis();
179    match &message.message {
180        Message::User { .. } => json!({
181            "role": "user",
182            "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
183            "timestamp": epoch_ms,
184        }),
185        Message::Assistant { .. } => json!({
186            "role": "assistant",
187            "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
188            "timestamp": epoch_ms,
189        }),
190        Message::Tool { .. } => {
191            // spec.md#adapter-native-restore-lossless (foreign clause): a
192            // canonical Tool message with no ToolResult part - or with parts
193            // that lack call_id/name - serializes with empty-string slots.
194            // That's lossy by design for foreign restore; the unaltered
195            // source still lives in canonical and in `raw_record`.
196            let part = message.parts.first();
197            let (call_id, name, is_error, result) = match part.map(|p| &p.kind) {
198                Some(PartKind::ToolResult {
199                    call_id,
200                    name,
201                    is_failure,
202                    result,
203                }) => (
204                    extracted_text(call_id).to_owned(),
205                    extracted_text(name).to_owned(),
206                    *is_failure,
207                    result.clone(),
208                ),
209                _ => (String::new(), String::new(), false, Value::Null),
210            };
211            json!({
212                "role": "toolResult",
213                "toolCallId": call_id,
214                "toolName": name,
215                "content": result,
216                "isError": is_error,
217                "timestamp": epoch_ms,
218            })
219        }
220        // serialize_session drops System carriers before reaching here in
221        // foreign mode, and native mode replays the source row verbatim - so
222        // this arm only fires if a caller invokes pi_message_record on a
223        // System message directly. Unreachable on every legitimate path.
224        Message::System { .. } => {
225            unreachable!("System messages are not serialized through pi_inner_message")
226        }
227    }
228}
229
230fn pi_content_item(part: &Part) -> Value {
231    match &part.kind {
232        PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
233        PartKind::Reasoning { text } => json!({
234            "type": "thinking",
235            "thinking": extracted_text(text),
236            "thinkingSignature": part
237                .options
238                .get("pi")
239                .and_then(|p| p.get("thinking_signature")),
240        }),
241        PartKind::ToolCall {
242            call_id,
243            name,
244            params,
245            ..
246        } => json!({
247            "type": "toolCall",
248            "id": extracted_text(call_id),
249            "name": extracted_text(name),
250            "arguments": params,
251        }),
252        other => json!({
253            "type": "text",
254            "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
255        }),
256    }
257}
258
259/// Configured pi-coding-agent reader. Walks a tree of `*.jsonl` files under [`Self::root`]
260/// and yields canonical events in source order per session.
261#[derive(Debug, Clone)]
262pub struct PiCodingAgentAdapter {
263    root: PathBuf,
264}
265
266impl PiCodingAgentAdapter {
267    pub fn new(root: impl Into<PathBuf>) -> Self {
268        Self { root: root.into() }
269    }
270}
271
272impl Adapter for PiCodingAgentAdapter {
273    fn discover(&self) -> DiscoverFuture<'_> {
274        jsonl_tree_discover(self)
275    }
276
277    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
278        jsonl_tree_events(self, oracle)
279    }
280}
281
282impl JsonlTree for PiCodingAgentAdapter {
283    // pi-coding-agent's `toolResult` records carry their own `toolName`, so unlike
284    // claude-code / codex-cli the adapter needs no per-file call_id -> name map.
285    type State = ();
286
287    fn name(&self) -> &'static str {
288        NAME
289    }
290
291    fn root(&self) -> &Path {
292        &self.root
293    }
294
295    fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
296        let row: Value = serde_json::from_str(first_line).ok()?;
297        if row.get("type").and_then(Value::as_str) == Some("session") {
298            row.get("id").and_then(Value::as_str).map(ToOwned::to_owned)
299        } else {
300            None
301        }
302    }
303
304    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
305        session_from_rows(path, rows)
306    }
307
308    fn events_from_row(
309        &self,
310        session: &Session,
311        row: &BoundedRow,
312        _state: &mut Self::State,
313    ) -> Result<Vec<IngestEvent>, String> {
314        events_from_row(&session.id, row.line, &row.value, session.created_at)
315    }
316}
317
318fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
319    let path_display = path.display().to_string();
320    let first = rows
321        .first()
322        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
323    let row = &first.value;
324    let at_first = format!("{path_display}:{}", first.line);
325    if row.get("type").and_then(Value::as_str) != Some("session") {
326        return Err(AdapterError::schema(
327            NAME,
328            at_first,
329            "first row must be a `session` record",
330        ));
331    }
332    let id = row
333        .get("id")
334        .and_then(Value::as_str)
335        .ok_or_else(|| AdapterError::schema(NAME, at_first.clone(), "session record missing id"))?
336        .to_owned();
337    let created_at = row
338        .get("timestamp")
339        .and_then(Value::as_str)
340        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
341        .map(|dt| dt.with_timezone(&Utc))
342        .ok_or_else(|| {
343            AdapterError::schema(
344                NAME,
345                at_first.clone(),
346                "session record has no parseable timestamp",
347            )
348        })?;
349    let project = extract_str(row, "cwd").ok_or_else(|| {
350        // spec.md#model-project-non-empty: pi always records `cwd` on the
351        // session line; its absence is a malformed session, not a default.
352        AdapterError::schema(NAME, at_first, "session record missing cwd")
353    })?;
354
355    // Capture the exact on-disk path components so native restore reproduces
356    // the source file byte-for-byte (the slug encoding and filename timestamp
357    // are not recomputable from canonical alone).
358    let project_slug = path
359        .parent()
360        .and_then(|p| p.file_name())
361        .and_then(|n| n.to_str())
362        .map(ToOwned::to_owned);
363    let file_name = path
364        .file_name()
365        .and_then(|n| n.to_str())
366        .map(ToOwned::to_owned);
367
368    let mut options = ProviderOptions::new();
369    options.insert(
370        "source".to_owned(),
371        json!({
372            "adapter": NAME,
373            "version": row.get("version"),
374            "project_slug": project_slug,
375            "file_name": file_name,
376            "raw_record": extract_raw_record(row),
377        }),
378    );
379
380    Ok(Session {
381        id,
382        parent_session_id: None,
383        parent_message_id: None,
384        source_agent: NAME.to_owned(),
385        created_at,
386        project,
387        options,
388    })
389}
390
391/// Map one pi JSONL record into zero-or-more `IngestEvent`s. `session` is
392/// consumed up front (eventless here); `model_change` / `thinking_level_change`
393/// / `compaction` become System carriers; `message` becomes a User / Assistant
394/// / Tool message plus its content Parts.
395fn events_from_row(
396    session_id: &str,
397    line: usize,
398    row: &Value,
399    default_timestamp: DateTime<Utc>,
400) -> Result<Vec<IngestEvent>, String> {
401    let kind = row.get("type").and_then(Value::as_str);
402    let timestamp = row
403        .get("timestamp")
404        .and_then(Value::as_str)
405        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
406        .map(|dt| dt.with_timezone(&Utc))
407        .unwrap_or(default_timestamp);
408    let id = row
409        .get("id")
410        .and_then(Value::as_str)
411        .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
412
413    match kind {
414        // Consumed up front by `session_from_rows`.
415        Some("session") => Ok(Vec::new()),
416        Some("message") => {
417            let message_value = row
418                .get("message")
419                .ok_or_else(|| "message record missing `message` field".to_owned())?;
420            message_events(session_id, &id, timestamp, row, message_value, line)
421        }
422        // Session-state carriers: keep the human-meaningful field as content,
423        // the rest of the record in `options.source.raw_record`.
424        Some("compaction") => Ok(vec![carrier_event(
425            session_id,
426            &id,
427            timestamp,
428            row,
429            line,
430            extract_str(row, "summary"),
431        )]),
432        Some("model_change") | Some("thinking_level_change") => Ok(vec![carrier_event(
433            session_id,
434            &id,
435            timestamp,
436            row,
437            line,
438            extract_str(row, "type"),
439        )]),
440        // Unknown record type: preserve it as a System carrier rather than
441        // dropping (spec.md#adapter-integrity-no-silent-drops). The raw record
442        // survives in options; the type label is the content.
443        _ => Ok(vec![carrier_event(
444            session_id,
445            &id,
446            timestamp,
447            row,
448            line,
449            extract_str(row, "type"),
450        )]),
451    }
452}
453
454fn carrier_event(
455    session_id: &str,
456    id: &str,
457    timestamp: DateTime<Utc>,
458    row: &Value,
459    line: usize,
460    content: Option<Extracted<String>>,
461) -> IngestEvent {
462    IngestEvent::Message(Message::System {
463        id: id.to_owned(),
464        session_id: session_id.to_owned(),
465        timestamp,
466        content,
467        options: row_options(row, line),
468    })
469}
470
471fn message_events(
472    session_id: &str,
473    id: &str,
474    timestamp: DateTime<Utc>,
475    row: &Value,
476    message_value: &Value,
477    line: usize,
478) -> Result<Vec<IngestEvent>, String> {
479    let role = message_value
480        .get("role")
481        .and_then(Value::as_str)
482        .ok_or_else(|| "message missing role".to_owned())?;
483    let content = message_value
484        .get("content")
485        .and_then(Value::as_array)
486        .cloned()
487        .unwrap_or_default();
488
489    let mut parts = Vec::new();
490    let message = match role {
491        "user" => {
492            // spec.md#model-part-provenance: pi user messages are genuine human
493            // prompts; harness-injected context arrives as separate records
494            // (compaction, model_change), not inside a user turn.
495            for (ordinal, item) in content.iter().enumerate() {
496                parts.push(user_part(session_id, id, ordinal, item));
497            }
498            Message::User {
499                id: id.to_owned(),
500                session_id: session_id.to_owned(),
501                timestamp,
502                options: row_options(row, line),
503            }
504        }
505        "assistant" => {
506            for (ordinal, item) in content.iter().enumerate() {
507                parts.push(assistant_part(session_id, id, ordinal, item));
508            }
509            Message::Assistant {
510                id: id.to_owned(),
511                session_id: session_id.to_owned(),
512                timestamp,
513                options: assistant_options(row, message_value, line),
514            }
515        }
516        "toolResult" => {
517            parts.push(tool_result_part(session_id, id, message_value));
518            Message::Tool {
519                id: id.to_owned(),
520                session_id: session_id.to_owned(),
521                timestamp,
522                options: row_options(row, line),
523            }
524        }
525        // Unknown nested roles are still parseable source records. Preserve the
526        // row as a System carrier instead of turning it into a counted drop.
527        _ => Message::System {
528            id: id.to_owned(),
529            session_id: session_id.to_owned(),
530            timestamp,
531            content: extract_str(message_value, "role"),
532            options: row_options(row, line),
533        },
534    };
535
536    let mut events = Vec::with_capacity(parts.len() + 1);
537    events.push(IngestEvent::Message(message));
538    events.extend(parts.into_iter().map(IngestEvent::Part));
539    Ok(events)
540}
541
542fn user_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
543    let kind = match item.get("type").and_then(Value::as_str) {
544        Some("text") => PartKind::Text {
545            text: extract_str(item, "text"),
546        },
547        // Anything else (e.g. an `image` content item) is preserved losslessly
548        // as a compact-JSON Text Part rather than dropped.
549        _ => PartKind::Text {
550            text: Some(extract_compact_repr(item)),
551        },
552    };
553    Part {
554        session_id: session_id.to_owned(),
555        id: part_id(message_id, ordinal),
556        message_id: message_id.to_owned(),
557        ordinal: part_ordinal(ordinal),
558        // spec.md#model-part-provenance: a genuine human prompt is conversation.
559        provenance: Provenance::Conversational,
560        options: empty_options(),
561        kind,
562    }
563}
564
565fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
566    // spec.md#model-part-provenance: assistant text, reasoning, and tool calls
567    // are model-authored, hence conversational.
568    let (kind, options) = match item.get("type").and_then(Value::as_str) {
569        Some("text") => (
570            PartKind::Text {
571                text: extract_str(item, "text"),
572            },
573            empty_options(),
574        ),
575        Some("thinking") => (
576            PartKind::Reasoning {
577                text: extract_str(item, "thinking"),
578            },
579            thinking_options(item),
580        ),
581        Some("toolCall") => (
582            PartKind::ToolCall {
583                call_id: extract_str(item, "id"),
584                name: extract_str(item, "name"),
585                params: item.get("arguments").cloned().unwrap_or(Value::Null),
586                provider_executed: false,
587            },
588            empty_options(),
589        ),
590        // Lossless fallback for an unrecognised assistant content shape.
591        _ => (
592            PartKind::Text {
593                text: Some(extract_compact_repr(item)),
594            },
595            empty_options(),
596        ),
597    };
598    Part {
599        session_id: session_id.to_owned(),
600        id: part_id(message_id, ordinal),
601        message_id: message_id.to_owned(),
602        ordinal: part_ordinal(ordinal),
603        provenance: Provenance::Conversational,
604        options,
605        kind,
606    }
607}
608
609fn tool_result_part(session_id: &str, message_id: &str, message_value: &Value) -> Part {
610    Part {
611        session_id: session_id.to_owned(),
612        id: part_id(message_id, 0),
613        message_id: message_id.to_owned(),
614        ordinal: 0,
615        // spec.md#model-part-provenance: tool output is runtime-produced.
616        provenance: Provenance::Injected,
617        options: empty_options(),
618        kind: PartKind::ToolResult {
619            call_id: extract_str(message_value, "toolCallId"),
620            name: extract_str(message_value, "toolName"),
621            is_failure: message_value
622                .get("isError")
623                .and_then(Value::as_bool)
624                .unwrap_or(false),
625            // The whole `content` array (text and/or image items) is the
626            // faithful result payload.
627            result: message_value.get("content").cloned().unwrap_or(Value::Null),
628        },
629    }
630}
631
632fn row_options(row: &Value, line: usize) -> ProviderOptions {
633    let mut options = ProviderOptions::new();
634    options.insert(
635        "source".to_owned(),
636        json!({
637            "line": line,
638            "parent_id": row.get("parentId"),
639            "raw_type": row.get("type"),
640            "raw_record": extract_raw_record(row),
641        }),
642    );
643    options
644}
645
646fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
647    let mut options = row_options(row, line);
648    options.insert(
649        "pi".to_owned(),
650        json!({
651            "api": message_value.get("api"),
652            "provider": message_value.get("provider"),
653            "model": message_value.get("model"),
654            "usage": message_value.get("usage"),
655            "stop_reason": message_value.get("stopReason"),
656            "response_id": message_value.get("responseId"),
657        }),
658    );
659    options
660}
661
662fn thinking_options(item: &Value) -> ProviderOptions {
663    let mut options = ProviderOptions::new();
664    if let Some(signature) = item.get("thinkingSignature") {
665        options.insert("pi".to_owned(), json!({ "thinking_signature": signature }));
666    }
667    options
668}
669
670#[cfg(test)]
671mod tests {
672    //! End-to-end test for the pi-coding-agent adapter: ingest the committed fixture corpus
673    //! and assert pond's canonical Session/Message/Part shape comes out the
674    //! other side. The fixture lives under `tests/fixtures/adapter/pi-coding-agent/`.
675    #![allow(clippy::expect_used, clippy::unwrap_used)]
676
677    use super::*;
678    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
679    use tempfile::TempDir;
680
681    // Manifest-dir anchored: unit tests must not depend on the process cwd
682    // (figment::Jail chdirs the whole test process while config tests run).
683    const FIXTURES: &str = concat!(
684        env!("CARGO_MANIFEST_DIR"),
685        "/tests/fixtures/adapter/pi-coding-agent/sessions"
686    );
687
688    #[test]
689    fn probe_default_finds_pi_sessions_under_home() -> anyhow::Result<()> {
690        crate::adapter::test_support::assert_probe_default(
691            &PiCodingAgentFactory,
692            &[".pi", "agent", "sessions"],
693        )
694    }
695
696    #[tokio::test(flavor = "multi_thread")]
697    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
698        let adapter = PiCodingAgentAdapter::new(FIXTURES);
699        crate::adapter::test_support::assert_native_restore(
700            &PiCodingAgentFactory,
701            &adapter,
702            // pi-coding-agent relative paths embed the `sessions/` segment, so the corpus
703            // root is FIXTURES' parent, not FIXTURES itself.
704            std::path::Path::new(FIXTURES)
705                .parent()
706                .expect("FIXTURES is nested under a corpus root"),
707        )
708        .await
709    }
710
711    #[tokio::test(flavor = "multi_thread")]
712    async fn pi_coding_agent_adapter_ingests_fixture_corpus_into_canonical_shape()
713    -> anyhow::Result<()> {
714        let temp = TempDir::new()?;
715        let store = Store::open_local(temp.path()).await?;
716        let adapter = PiCodingAgentAdapter::new(FIXTURES);
717
718        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
719        assert!(summary.accepted() > 0, "ingest must accept rows");
720        assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
721        assert_eq!(
722            summary.dropped_sessions, 0,
723            "no session-level rejections expected"
724        );
725        assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
726
727        let (sessions, messages, parts) = store.row_counts().await?;
728        assert!(sessions > 0, "at least one pi-coding-agent session");
729        assert!(messages > 0, "at least one pi-coding-agent message");
730        assert!(parts > 0, "at least one pi-coding-agent Part");
731
732        let mut saw_tool_call = false;
733        let mut saw_tool_result = false;
734        let mut saw_reasoning = false;
735        for session_id in store.session_ids().await? {
736            let session = store
737                .get_session(&session_id)
738                .await?
739                .expect("session round-trips");
740            assert_eq!(session.session.source_agent, NAME);
741            assert!(
742                !(*session.session.project).is_empty(),
743                "spec.md#model-project-non-empty: project must be a real cwd",
744            );
745            for stored in &session.messages {
746                for part in &stored.parts {
747                    match &part.kind {
748                        PartKind::ToolCall { .. } => saw_tool_call = true,
749                        PartKind::ToolResult { .. } => saw_tool_result = true,
750                        PartKind::Reasoning { .. } => saw_reasoning = true,
751                        _ => {}
752                    }
753                }
754            }
755        }
756        assert!(saw_tool_call, "corpus has assistant tool calls");
757        assert!(saw_tool_result, "corpus has tool results");
758        assert!(saw_reasoning, "corpus has assistant reasoning");
759        Ok(())
760    }
761
762    #[test]
763    fn unknown_nested_message_role_becomes_system_carrier() -> anyhow::Result<()> {
764        let row = json!({
765            "type": "message",
766            "id": "mystery-message",
767            "message": {
768                "role": "mysteryRole",
769                "content": [{"type": "text", "text": "not yet understood"}]
770            }
771        });
772        let events = events_from_row(
773            "session-1",
774            42,
775            &row,
776            DateTime::parse_from_rfc3339("2026-04-28T18:47:32.280Z")?.with_timezone(&Utc),
777        )
778        .map_err(anyhow::Error::msg)?;
779
780        assert_eq!(events.len(), 1);
781        let IngestEvent::Message(Message::System {
782            id,
783            content,
784            options,
785            ..
786        }) = &events[0]
787        else {
788            panic!("unknown role must produce a System carrier");
789        };
790        assert_eq!(id, "mystery-message");
791        assert_eq!(content.as_deref().map(String::as_str), Some("mysteryRole"));
792        assert_eq!(
793            raw_record(options)
794                .and_then(|raw| raw.get("message").cloned())
795                .and_then(|message| message.get("role").cloned()),
796            Some(json!("mysteryRole")),
797        );
798        Ok(())
799    }
800
801    #[tokio::test(flavor = "multi_thread")]
802    async fn fork_parent_ids_and_compaction_summary_are_preserved() -> anyhow::Result<()> {
803        let temp = TempDir::new()?;
804        let root = temp.path().join("sessions");
805        let path = root
806            .join("project")
807            .join("2026-05-01T00-00-00-000Z_fork.jsonl");
808        write_jsonl_file(
809            &path,
810            &[
811                json!({
812                    "type": "session",
813                    "version": 3,
814                    "id": "pi-fork-session",
815                    "timestamp": "2026-05-01T00:00:00.000Z",
816                    "cwd": "/tmp/pi-fork",
817                }),
818                json!({
819                    "type": "message",
820                    "id": "parent-message",
821                    "timestamp": "2026-05-01T00:00:01.000Z",
822                    "message": {
823                        "role": "user",
824                        "content": [{"type": "text", "text": "parent"}],
825                    },
826                }),
827                json!({
828                    "type": "message",
829                    "id": "child-a",
830                    "parentId": "parent-message",
831                    "timestamp": "2026-05-01T00:00:02.000Z",
832                    "message": {
833                        "role": "assistant",
834                        "content": [{"type": "text", "text": "branch a"}],
835                    },
836                }),
837                json!({
838                    "type": "message",
839                    "id": "child-b",
840                    "parentId": "parent-message",
841                    "timestamp": "2026-05-01T00:00:03.000Z",
842                    "message": {
843                        "role": "assistant",
844                        "content": [{"type": "text", "text": "branch b"}],
845                    },
846                }),
847                json!({
848                    "type": "compaction",
849                    "id": "compact-1",
850                    "parentId": "child-b",
851                    "timestamp": "2026-05-01T00:00:04.000Z",
852                    "summary": "compact summary",
853                }),
854            ],
855        )?;
856
857        let store = Store::open_local(temp.path().join("store")).await?;
858        let summary = ingest_adapter(
859            &store,
860            &PiCodingAgentAdapter::new(&root),
861            &crate::adapter::NoopOracle,
862            |_| {},
863        )
864        .await?;
865        assert_eq!(summary.dropped_events, 0);
866
867        let session = store
868            .get_session("pi-fork-session")
869            .await?
870            .expect("fixture session lands");
871        let child_a = session
872            .messages
873            .iter()
874            .find(|stored| stored.message.id() == "child-a")
875            .expect("first fork child lands");
876        let child_b = session
877            .messages
878            .iter()
879            .find(|stored| stored.message.id() == "child-b")
880            .expect("second fork child lands");
881        for child in [child_a, child_b] {
882            assert_eq!(
883                child
884                    .message
885                    .options()
886                    .get("source")
887                    .and_then(|source| source.get("parent_id"))
888                    .and_then(Value::as_str),
889                Some("parent-message"),
890            );
891        }
892        assert!(source_line(child_a.message.options()) < source_line(child_b.message.options()));
893
894        let compact = session
895            .messages
896            .iter()
897            .find(|stored| stored.message.id() == "compact-1")
898            .expect("compaction carrier lands");
899        let Message::System { content, .. } = &compact.message else {
900            panic!("compaction is preserved as a System carrier");
901        };
902        assert_eq!(
903            content.as_deref().map(String::as_str),
904            Some("compact summary")
905        );
906        Ok(())
907    }
908
909    #[tokio::test(flavor = "multi_thread")]
910    async fn foreign_serialization_reparses_as_pi_coding_agent() -> anyhow::Result<()> {
911        let temp = TempDir::new()?;
912        let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
913        let origin = crate::adapter::OpencodeAdapter::new(concat!(
914            env!("CARGO_MANIFEST_DIR"),
915            "/tests/fixtures/adapter/opencode/storage"
916        ));
917        ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
918        let session_id = origin_store
919            .session_ids()
920            .await?
921            .into_iter()
922            .next()
923            .expect("opencode fixture has sessions");
924        let session = origin_store
925            .get_session(&session_id)
926            .await?
927            .expect("fixture session is readable");
928
929        let restored_root = temp.path().join("pi-corpus");
930        crate::adapter::write_restored_files(
931            &restored_root,
932            PiCodingAgentFactory.serialize(&session, RestoreFidelity::Foreign)?,
933        )?;
934        let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
935        let summary = ingest_adapter(
936            &restored_store,
937            &PiCodingAgentAdapter::new(restored_root.join("sessions")),
938            &crate::adapter::NoopOracle,
939            |_| {},
940        )
941        .await?;
942
943        assert!(summary.accepted() > 0);
944        assert_eq!(summary.dropped_events, 0);
945        Ok(())
946    }
947
948    /// spec.md#model-part-provenance: a tool result is harness-injected; an
949    /// assistant turn's text/reasoning/tool-call parts are conversation.
950    #[tokio::test(flavor = "multi_thread")]
951    async fn tool_results_are_injected_assistant_parts_are_conversational() -> anyhow::Result<()> {
952        let temp = TempDir::new()?;
953        let store = Store::open_local(temp.path()).await?;
954        let adapter = PiCodingAgentAdapter::new(FIXTURES);
955        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
956
957        for session_id in store.session_ids().await? {
958            let session = store
959                .get_session(&session_id)
960                .await?
961                .expect("session round-trips");
962            for stored in &session.messages {
963                for part in &stored.parts {
964                    match &part.kind {
965                        PartKind::ToolResult { .. } => {
966                            assert_eq!(part.provenance, Provenance::Injected);
967                        }
968                        PartKind::ToolCall { .. } | PartKind::Reasoning { .. } => {
969                            assert_eq!(part.provenance, Provenance::Conversational);
970                        }
971                        _ => {}
972                    }
973                }
974            }
975        }
976        Ok(())
977    }
978
979    fn write_jsonl_file(path: &std::path::Path, records: &[Value]) -> anyhow::Result<()> {
980        if let Some(parent) = path.parent() {
981            std::fs::create_dir_all(parent)?;
982        }
983        std::fs::write(path, jsonl_bytes(NAME, records)?)?;
984        Ok(())
985    }
986}