1use std::path::{Path, PathBuf};
17
18use chrono::{DateTime, SecondsFormat, Utc};
19use serde_json::{Value, json};
20
21use crate::{
22 sessions::IngestEvent,
23 wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
24};
25
26use super::{
27 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
28 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
29 empty_options,
30 extract::{Extracted, extract_compact_repr, extract_raw_record, extract_str},
31 extracted_text,
32 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
33 jsonl_bytes, part_id, part_ordinal, raw_record,
34};
35
36const NAME: &str = "pi-coding-agent";
37
38pub struct PiCodingAgentFactory;
41
42impl AdapterFactory for PiCodingAgentFactory {
43 fn name(&self) -> &'static str {
44 NAME
45 }
46
47 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
48 Ok(Box::new(PiCodingAgentAdapter::new(config_path(
49 NAME, config,
50 )?)))
51 }
52
53 fn probe_default(&self, env: &Env) -> Option<Value> {
54 let path = env.home.join(".pi").join("agent").join("sessions");
55 path.exists().then(|| json!({ "path": path }))
56 }
57
58 fn serialize(
59 &self,
60 session: &crate::sessions::SessionWithMessages,
61 fidelity: RestoreFidelity,
62 ) -> Result<Vec<RestoredFile>, AdapterError> {
63 serialize_session(session, fidelity)
64 }
65}
66
67fn serialize_session(
68 session: &crate::sessions::SessionWithMessages,
69 fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71 let session_raw = raw_record(&session.session.options);
83 let actual = match fidelity {
84 RestoreFidelity::Native if session_raw.is_some() => RestoreFidelity::Native,
85 _ => RestoreFidelity::Foreign,
86 };
87
88 let mut records = Vec::new();
89 if actual == RestoreFidelity::Native {
90 records.push(session_raw.unwrap_or_else(|| pi_session_record(session)));
91 } else {
92 records.push(pi_session_record(session));
93 }
94
95 let mut messages: Vec<&crate::sessions::MessageWithParts> = session.messages.iter().collect();
98 if actual == RestoreFidelity::Native {
99 messages.sort_by(|left, right| {
100 source_line(left.message.options())
101 .cmp(&source_line(right.message.options()))
102 .then_with(|| by_timestamp_then_id(left, right))
103 });
104 } else {
105 messages.sort_by(|left, right| by_timestamp_then_id(left, right));
106 }
107
108 for message in &messages {
109 if actual == RestoreFidelity::Native
110 && let Some(raw) = raw_record(message.message.options())
111 {
112 records.push(raw);
113 continue;
114 }
115 if matches!(message.message, Message::System { .. }) {
120 continue;
121 }
122 records.push(pi_message_record(message));
123 }
124
125 Ok(vec![RestoredFile::new(
126 pi_relative_path(session),
127 jsonl_bytes(NAME, &records)?,
128 actual,
129 )])
130}
131
132fn pi_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
136 let source = session.session.options.get("source");
137 let slug = source
138 .and_then(|s| s.get("project_slug"))
139 .and_then(Value::as_str)
140 .map(ToOwned::to_owned)
141 .unwrap_or_else(|| encode_project(&session.session.project));
142 let file_name = source
143 .and_then(|s| s.get("file_name"))
144 .and_then(Value::as_str)
145 .map(ToOwned::to_owned)
146 .unwrap_or_else(|| {
147 let ts = session.session.created_at.format("%Y-%m-%dT%H-%M-%S-%3fZ");
148 format!("{ts}_{}.jsonl", session.session.id)
149 });
150 PathBuf::from("sessions").join(slug).join(file_name)
151}
152
153fn encode_project(project: &str) -> String {
154 project.replace(['/', '.'], "-")
155}
156
157fn pi_session_record(session: &crate::sessions::SessionWithMessages) -> Value {
158 json!({
159 "type": "session",
160 "version": 3,
161 "id": session.session.id,
162 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
163 "cwd": &*session.session.project,
164 })
165}
166
167fn pi_message_record(message: &crate::sessions::MessageWithParts) -> Value {
168 json!({
169 "type": "message",
170 "id": message.message.id(),
171 "parentId": message.message.options().get("source").and_then(|s| s.get("parent_id")),
172 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
173 "message": pi_inner_message(message),
174 })
175}
176
177fn pi_inner_message(message: &crate::sessions::MessageWithParts) -> Value {
178 let epoch_ms = message.message.timestamp().timestamp_millis();
179 match &message.message {
180 Message::User { .. } => json!({
181 "role": "user",
182 "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
183 "timestamp": epoch_ms,
184 }),
185 Message::Assistant { .. } => json!({
186 "role": "assistant",
187 "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
188 "timestamp": epoch_ms,
189 }),
190 Message::Tool { .. } => {
191 let part = message.parts.first();
197 let (call_id, name, is_error, result) = match part.map(|p| &p.kind) {
198 Some(PartKind::ToolResult {
199 call_id,
200 name,
201 is_failure,
202 result,
203 }) => (
204 extracted_text(call_id).to_owned(),
205 extracted_text(name).to_owned(),
206 *is_failure,
207 result.clone(),
208 ),
209 _ => (String::new(), String::new(), false, Value::Null),
210 };
211 json!({
212 "role": "toolResult",
213 "toolCallId": call_id,
214 "toolName": name,
215 "content": result,
216 "isError": is_error,
217 "timestamp": epoch_ms,
218 })
219 }
220 Message::System { .. } => {
225 unreachable!("System messages are not serialized through pi_inner_message")
226 }
227 }
228}
229
230fn pi_content_item(part: &Part) -> Value {
231 match &part.kind {
232 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
233 PartKind::Reasoning { text } => json!({
234 "type": "thinking",
235 "thinking": extracted_text(text),
236 "thinkingSignature": part
237 .options
238 .get("pi")
239 .and_then(|p| p.get("thinking_signature")),
240 }),
241 PartKind::ToolCall {
242 call_id,
243 name,
244 params,
245 ..
246 } => json!({
247 "type": "toolCall",
248 "id": extracted_text(call_id),
249 "name": extracted_text(name),
250 "arguments": params,
251 }),
252 other => json!({
253 "type": "text",
254 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
255 }),
256 }
257}
258
259#[derive(Debug, Clone)]
262pub struct PiCodingAgentAdapter {
263 root: PathBuf,
264}
265
266impl PiCodingAgentAdapter {
267 pub fn new(root: impl Into<PathBuf>) -> Self {
268 Self { root: root.into() }
269 }
270}
271
272impl Adapter for PiCodingAgentAdapter {
273 fn discover(&self) -> DiscoverFuture<'_> {
274 jsonl_tree_discover(self)
275 }
276
277 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
278 jsonl_tree_events(self, oracle)
279 }
280}
281
282impl JsonlTree for PiCodingAgentAdapter {
283 type State = ();
286
287 fn name(&self) -> &'static str {
288 NAME
289 }
290
291 fn root(&self) -> &Path {
292 &self.root
293 }
294
295 fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
296 let row: Value = serde_json::from_str(first_line).ok()?;
297 if row.get("type").and_then(Value::as_str) == Some("session") {
298 row.get("id").and_then(Value::as_str).map(ToOwned::to_owned)
299 } else {
300 None
301 }
302 }
303
304 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
305 session_from_rows(path, rows)
306 }
307
308 fn events_from_row(
309 &self,
310 session: &Session,
311 row: &BoundedRow,
312 _state: &mut Self::State,
313 ) -> Result<Vec<IngestEvent>, String> {
314 events_from_row(&session.id, row.line, &row.value, session.created_at)
315 }
316}
317
318fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
319 let path_display = path.display().to_string();
320 let first = rows
321 .first()
322 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
323 let row = &first.value;
324 let at_first = format!("{path_display}:{}", first.line);
325 if row.get("type").and_then(Value::as_str) != Some("session") {
326 return Err(AdapterError::schema(
327 NAME,
328 at_first,
329 "first row must be a `session` record",
330 ));
331 }
332 let id = row
333 .get("id")
334 .and_then(Value::as_str)
335 .ok_or_else(|| AdapterError::schema(NAME, at_first.clone(), "session record missing id"))?
336 .to_owned();
337 let created_at = row
338 .get("timestamp")
339 .and_then(Value::as_str)
340 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
341 .map(|dt| dt.with_timezone(&Utc))
342 .ok_or_else(|| {
343 AdapterError::schema(
344 NAME,
345 at_first.clone(),
346 "session record has no parseable timestamp",
347 )
348 })?;
349 let project = extract_str(row, "cwd").ok_or_else(|| {
350 AdapterError::schema(NAME, at_first, "session record missing cwd")
353 })?;
354
355 let project_slug = path
359 .parent()
360 .and_then(|p| p.file_name())
361 .and_then(|n| n.to_str())
362 .map(ToOwned::to_owned);
363 let file_name = path
364 .file_name()
365 .and_then(|n| n.to_str())
366 .map(ToOwned::to_owned);
367
368 let mut options = ProviderOptions::new();
369 options.insert(
370 "source".to_owned(),
371 json!({
372 "adapter": NAME,
373 "version": row.get("version"),
374 "project_slug": project_slug,
375 "file_name": file_name,
376 "raw_record": extract_raw_record(row),
377 }),
378 );
379
380 Ok(Session {
381 id,
382 parent_session_id: None,
383 parent_message_id: None,
384 source_agent: NAME.to_owned(),
385 created_at,
386 project,
387 options,
388 })
389}
390
391fn events_from_row(
396 session_id: &str,
397 line: usize,
398 row: &Value,
399 default_timestamp: DateTime<Utc>,
400) -> Result<Vec<IngestEvent>, String> {
401 let kind = row.get("type").and_then(Value::as_str);
402 let timestamp = row
403 .get("timestamp")
404 .and_then(Value::as_str)
405 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
406 .map(|dt| dt.with_timezone(&Utc))
407 .unwrap_or(default_timestamp);
408 let id = row
409 .get("id")
410 .and_then(Value::as_str)
411 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
412
413 match kind {
414 Some("session") => Ok(Vec::new()),
416 Some("message") => {
417 let message_value = row
418 .get("message")
419 .ok_or_else(|| "message record missing `message` field".to_owned())?;
420 message_events(session_id, &id, timestamp, row, message_value, line)
421 }
422 Some("compaction") => Ok(vec![carrier_event(
425 session_id,
426 &id,
427 timestamp,
428 row,
429 line,
430 extract_str(row, "summary"),
431 )]),
432 Some("model_change") | Some("thinking_level_change") => Ok(vec![carrier_event(
433 session_id,
434 &id,
435 timestamp,
436 row,
437 line,
438 extract_str(row, "type"),
439 )]),
440 _ => Ok(vec![carrier_event(
444 session_id,
445 &id,
446 timestamp,
447 row,
448 line,
449 extract_str(row, "type"),
450 )]),
451 }
452}
453
454fn carrier_event(
455 session_id: &str,
456 id: &str,
457 timestamp: DateTime<Utc>,
458 row: &Value,
459 line: usize,
460 content: Option<Extracted<String>>,
461) -> IngestEvent {
462 IngestEvent::Message(Message::System {
463 id: id.to_owned(),
464 session_id: session_id.to_owned(),
465 timestamp,
466 content,
467 options: row_options(row, line),
468 })
469}
470
471fn message_events(
472 session_id: &str,
473 id: &str,
474 timestamp: DateTime<Utc>,
475 row: &Value,
476 message_value: &Value,
477 line: usize,
478) -> Result<Vec<IngestEvent>, String> {
479 let role = message_value
480 .get("role")
481 .and_then(Value::as_str)
482 .ok_or_else(|| "message missing role".to_owned())?;
483 let content = message_value
484 .get("content")
485 .and_then(Value::as_array)
486 .cloned()
487 .unwrap_or_default();
488
489 let mut parts = Vec::new();
490 let message = match role {
491 "user" => {
492 for (ordinal, item) in content.iter().enumerate() {
496 parts.push(user_part(session_id, id, ordinal, item));
497 }
498 Message::User {
499 id: id.to_owned(),
500 session_id: session_id.to_owned(),
501 timestamp,
502 options: row_options(row, line),
503 }
504 }
505 "assistant" => {
506 for (ordinal, item) in content.iter().enumerate() {
507 parts.push(assistant_part(session_id, id, ordinal, item));
508 }
509 Message::Assistant {
510 id: id.to_owned(),
511 session_id: session_id.to_owned(),
512 timestamp,
513 options: assistant_options(row, message_value, line),
514 }
515 }
516 "toolResult" => {
517 parts.push(tool_result_part(session_id, id, message_value));
518 Message::Tool {
519 id: id.to_owned(),
520 session_id: session_id.to_owned(),
521 timestamp,
522 options: row_options(row, line),
523 }
524 }
525 _ => Message::System {
528 id: id.to_owned(),
529 session_id: session_id.to_owned(),
530 timestamp,
531 content: extract_str(message_value, "role"),
532 options: row_options(row, line),
533 },
534 };
535
536 let mut events = Vec::with_capacity(parts.len() + 1);
537 events.push(IngestEvent::Message(message));
538 events.extend(parts.into_iter().map(IngestEvent::Part));
539 Ok(events)
540}
541
542fn user_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
543 let kind = match item.get("type").and_then(Value::as_str) {
544 Some("text") => PartKind::Text {
545 text: extract_str(item, "text"),
546 },
547 _ => PartKind::Text {
550 text: Some(extract_compact_repr(item)),
551 },
552 };
553 Part {
554 session_id: session_id.to_owned(),
555 id: part_id(message_id, ordinal),
556 message_id: message_id.to_owned(),
557 ordinal: part_ordinal(ordinal),
558 provenance: Provenance::Conversational,
560 options: empty_options(),
561 kind,
562 }
563}
564
565fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
566 let (kind, options) = match item.get("type").and_then(Value::as_str) {
569 Some("text") => (
570 PartKind::Text {
571 text: extract_str(item, "text"),
572 },
573 empty_options(),
574 ),
575 Some("thinking") => (
576 PartKind::Reasoning {
577 text: extract_str(item, "thinking"),
578 },
579 thinking_options(item),
580 ),
581 Some("toolCall") => (
582 PartKind::ToolCall {
583 call_id: extract_str(item, "id"),
584 name: extract_str(item, "name"),
585 params: item.get("arguments").cloned().unwrap_or(Value::Null),
586 provider_executed: false,
587 },
588 empty_options(),
589 ),
590 _ => (
592 PartKind::Text {
593 text: Some(extract_compact_repr(item)),
594 },
595 empty_options(),
596 ),
597 };
598 Part {
599 session_id: session_id.to_owned(),
600 id: part_id(message_id, ordinal),
601 message_id: message_id.to_owned(),
602 ordinal: part_ordinal(ordinal),
603 provenance: Provenance::Conversational,
604 options,
605 kind,
606 }
607}
608
609fn tool_result_part(session_id: &str, message_id: &str, message_value: &Value) -> Part {
610 Part {
611 session_id: session_id.to_owned(),
612 id: part_id(message_id, 0),
613 message_id: message_id.to_owned(),
614 ordinal: 0,
615 provenance: Provenance::Injected,
617 options: empty_options(),
618 kind: PartKind::ToolResult {
619 call_id: extract_str(message_value, "toolCallId"),
620 name: extract_str(message_value, "toolName"),
621 is_failure: message_value
622 .get("isError")
623 .and_then(Value::as_bool)
624 .unwrap_or(false),
625 result: message_value.get("content").cloned().unwrap_or(Value::Null),
628 },
629 }
630}
631
632fn row_options(row: &Value, line: usize) -> ProviderOptions {
633 let mut options = ProviderOptions::new();
634 options.insert(
635 "source".to_owned(),
636 json!({
637 "line": line,
638 "parent_id": row.get("parentId"),
639 "raw_type": row.get("type"),
640 "raw_record": extract_raw_record(row),
641 }),
642 );
643 options
644}
645
646fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
647 let mut options = row_options(row, line);
648 options.insert(
649 "pi".to_owned(),
650 json!({
651 "api": message_value.get("api"),
652 "provider": message_value.get("provider"),
653 "model": message_value.get("model"),
654 "usage": message_value.get("usage"),
655 "stop_reason": message_value.get("stopReason"),
656 "response_id": message_value.get("responseId"),
657 }),
658 );
659 options
660}
661
662fn thinking_options(item: &Value) -> ProviderOptions {
663 let mut options = ProviderOptions::new();
664 if let Some(signature) = item.get("thinkingSignature") {
665 options.insert("pi".to_owned(), json!({ "thinking_signature": signature }));
666 }
667 options
668}
669
670#[cfg(test)]
671mod tests {
672 #![allow(clippy::expect_used, clippy::unwrap_used)]
676
677 use super::*;
678 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
679 use tempfile::TempDir;
680
681 const FIXTURES: &str = concat!(
684 env!("CARGO_MANIFEST_DIR"),
685 "/tests/fixtures/adapter/pi-coding-agent/sessions"
686 );
687
688 #[test]
689 fn probe_default_finds_pi_sessions_under_home() -> anyhow::Result<()> {
690 crate::adapter::test_support::assert_probe_default(
691 &PiCodingAgentFactory,
692 &[".pi", "agent", "sessions"],
693 )
694 }
695
696 #[tokio::test(flavor = "multi_thread")]
697 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
698 let adapter = PiCodingAgentAdapter::new(FIXTURES);
699 crate::adapter::test_support::assert_native_restore(
700 &PiCodingAgentFactory,
701 &adapter,
702 std::path::Path::new(FIXTURES)
705 .parent()
706 .expect("FIXTURES is nested under a corpus root"),
707 )
708 .await
709 }
710
711 #[tokio::test(flavor = "multi_thread")]
712 async fn pi_coding_agent_adapter_ingests_fixture_corpus_into_canonical_shape()
713 -> anyhow::Result<()> {
714 let temp = TempDir::new()?;
715 let store = Store::open_local(temp.path()).await?;
716 let adapter = PiCodingAgentAdapter::new(FIXTURES);
717
718 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
719 assert!(summary.accepted() > 0, "ingest must accept rows");
720 assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
721 assert_eq!(
722 summary.dropped_sessions, 0,
723 "no session-level rejections expected"
724 );
725 assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
726
727 let (sessions, messages, parts) = store.row_counts().await?;
728 assert!(sessions > 0, "at least one pi-coding-agent session");
729 assert!(messages > 0, "at least one pi-coding-agent message");
730 assert!(parts > 0, "at least one pi-coding-agent Part");
731
732 let mut saw_tool_call = false;
733 let mut saw_tool_result = false;
734 let mut saw_reasoning = false;
735 for session_id in store.session_ids().await? {
736 let session = store
737 .get_session(&session_id)
738 .await?
739 .expect("session round-trips");
740 assert_eq!(session.session.source_agent, NAME);
741 assert!(
742 !(*session.session.project).is_empty(),
743 "spec.md#model-project-non-empty: project must be a real cwd",
744 );
745 for stored in &session.messages {
746 for part in &stored.parts {
747 match &part.kind {
748 PartKind::ToolCall { .. } => saw_tool_call = true,
749 PartKind::ToolResult { .. } => saw_tool_result = true,
750 PartKind::Reasoning { .. } => saw_reasoning = true,
751 _ => {}
752 }
753 }
754 }
755 }
756 assert!(saw_tool_call, "corpus has assistant tool calls");
757 assert!(saw_tool_result, "corpus has tool results");
758 assert!(saw_reasoning, "corpus has assistant reasoning");
759 Ok(())
760 }
761
762 #[test]
763 fn unknown_nested_message_role_becomes_system_carrier() -> anyhow::Result<()> {
764 let row = json!({
765 "type": "message",
766 "id": "mystery-message",
767 "message": {
768 "role": "mysteryRole",
769 "content": [{"type": "text", "text": "not yet understood"}]
770 }
771 });
772 let events = events_from_row(
773 "session-1",
774 42,
775 &row,
776 DateTime::parse_from_rfc3339("2026-04-28T18:47:32.280Z")?.with_timezone(&Utc),
777 )
778 .map_err(anyhow::Error::msg)?;
779
780 assert_eq!(events.len(), 1);
781 let IngestEvent::Message(Message::System {
782 id,
783 content,
784 options,
785 ..
786 }) = &events[0]
787 else {
788 panic!("unknown role must produce a System carrier");
789 };
790 assert_eq!(id, "mystery-message");
791 assert_eq!(content.as_deref().map(String::as_str), Some("mysteryRole"));
792 assert_eq!(
793 raw_record(options)
794 .and_then(|raw| raw.get("message").cloned())
795 .and_then(|message| message.get("role").cloned()),
796 Some(json!("mysteryRole")),
797 );
798 Ok(())
799 }
800
801 #[tokio::test(flavor = "multi_thread")]
802 async fn fork_parent_ids_and_compaction_summary_are_preserved() -> anyhow::Result<()> {
803 let temp = TempDir::new()?;
804 let root = temp.path().join("sessions");
805 let path = root
806 .join("project")
807 .join("2026-05-01T00-00-00-000Z_fork.jsonl");
808 write_jsonl_file(
809 &path,
810 &[
811 json!({
812 "type": "session",
813 "version": 3,
814 "id": "pi-fork-session",
815 "timestamp": "2026-05-01T00:00:00.000Z",
816 "cwd": "/tmp/pi-fork",
817 }),
818 json!({
819 "type": "message",
820 "id": "parent-message",
821 "timestamp": "2026-05-01T00:00:01.000Z",
822 "message": {
823 "role": "user",
824 "content": [{"type": "text", "text": "parent"}],
825 },
826 }),
827 json!({
828 "type": "message",
829 "id": "child-a",
830 "parentId": "parent-message",
831 "timestamp": "2026-05-01T00:00:02.000Z",
832 "message": {
833 "role": "assistant",
834 "content": [{"type": "text", "text": "branch a"}],
835 },
836 }),
837 json!({
838 "type": "message",
839 "id": "child-b",
840 "parentId": "parent-message",
841 "timestamp": "2026-05-01T00:00:03.000Z",
842 "message": {
843 "role": "assistant",
844 "content": [{"type": "text", "text": "branch b"}],
845 },
846 }),
847 json!({
848 "type": "compaction",
849 "id": "compact-1",
850 "parentId": "child-b",
851 "timestamp": "2026-05-01T00:00:04.000Z",
852 "summary": "compact summary",
853 }),
854 ],
855 )?;
856
857 let store = Store::open_local(temp.path().join("store")).await?;
858 let summary = ingest_adapter(
859 &store,
860 &PiCodingAgentAdapter::new(&root),
861 &crate::adapter::NoopOracle,
862 |_| {},
863 )
864 .await?;
865 assert_eq!(summary.dropped_events, 0);
866
867 let session = store
868 .get_session("pi-fork-session")
869 .await?
870 .expect("fixture session lands");
871 let child_a = session
872 .messages
873 .iter()
874 .find(|stored| stored.message.id() == "child-a")
875 .expect("first fork child lands");
876 let child_b = session
877 .messages
878 .iter()
879 .find(|stored| stored.message.id() == "child-b")
880 .expect("second fork child lands");
881 for child in [child_a, child_b] {
882 assert_eq!(
883 child
884 .message
885 .options()
886 .get("source")
887 .and_then(|source| source.get("parent_id"))
888 .and_then(Value::as_str),
889 Some("parent-message"),
890 );
891 }
892 assert!(source_line(child_a.message.options()) < source_line(child_b.message.options()));
893
894 let compact = session
895 .messages
896 .iter()
897 .find(|stored| stored.message.id() == "compact-1")
898 .expect("compaction carrier lands");
899 let Message::System { content, .. } = &compact.message else {
900 panic!("compaction is preserved as a System carrier");
901 };
902 assert_eq!(
903 content.as_deref().map(String::as_str),
904 Some("compact summary")
905 );
906 Ok(())
907 }
908
909 #[tokio::test(flavor = "multi_thread")]
910 async fn foreign_serialization_reparses_as_pi_coding_agent() -> anyhow::Result<()> {
911 let temp = TempDir::new()?;
912 let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
913 let origin = crate::adapter::OpencodeAdapter::new(concat!(
914 env!("CARGO_MANIFEST_DIR"),
915 "/tests/fixtures/adapter/opencode/storage"
916 ));
917 ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
918 let session_id = origin_store
919 .session_ids()
920 .await?
921 .into_iter()
922 .next()
923 .expect("opencode fixture has sessions");
924 let session = origin_store
925 .get_session(&session_id)
926 .await?
927 .expect("fixture session is readable");
928
929 let restored_root = temp.path().join("pi-corpus");
930 crate::adapter::write_restored_files(
931 &restored_root,
932 PiCodingAgentFactory.serialize(&session, RestoreFidelity::Foreign)?,
933 )?;
934 let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
935 let summary = ingest_adapter(
936 &restored_store,
937 &PiCodingAgentAdapter::new(restored_root.join("sessions")),
938 &crate::adapter::NoopOracle,
939 |_| {},
940 )
941 .await?;
942
943 assert!(summary.accepted() > 0);
944 assert_eq!(summary.dropped_events, 0);
945 Ok(())
946 }
947
948 #[tokio::test(flavor = "multi_thread")]
951 async fn tool_results_are_injected_assistant_parts_are_conversational() -> anyhow::Result<()> {
952 let temp = TempDir::new()?;
953 let store = Store::open_local(temp.path()).await?;
954 let adapter = PiCodingAgentAdapter::new(FIXTURES);
955 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
956
957 for session_id in store.session_ids().await? {
958 let session = store
959 .get_session(&session_id)
960 .await?
961 .expect("session round-trips");
962 for stored in &session.messages {
963 for part in &stored.parts {
964 match &part.kind {
965 PartKind::ToolResult { .. } => {
966 assert_eq!(part.provenance, Provenance::Injected);
967 }
968 PartKind::ToolCall { .. } | PartKind::Reasoning { .. } => {
969 assert_eq!(part.provenance, Provenance::Conversational);
970 }
971 _ => {}
972 }
973 }
974 }
975 }
976 Ok(())
977 }
978
979 fn write_jsonl_file(path: &std::path::Path, records: &[Value]) -> anyhow::Result<()> {
980 if let Some(parent) = path.parent() {
981 std::fs::create_dir_all(parent)?;
982 }
983 std::fs::write(path, jsonl_bytes(NAME, records)?)?;
984 Ok(())
985 }
986}