1use std::path::{Path, PathBuf};
17
18use chrono::{DateTime, SecondsFormat, Utc};
19use serde_json::{Value, json};
20
21use crate::{
22 sessions::IngestEvent,
23 wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
24};
25
26use super::{
27 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
28 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
29 empty_options,
30 extract::{Extracted, extract_compact_repr, extract_raw_record, extract_str},
31 extracted_text,
32 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
33 jsonl_bytes, part_id, part_ordinal, raw_record,
34};
35
36const NAME: &str = "pi-coding-agent";
37
38pub struct PiCodingAgentFactory;
41
42impl AdapterFactory for PiCodingAgentFactory {
43 fn name(&self) -> &'static str {
44 NAME
45 }
46
47 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
48 Ok(Box::new(PiCodingAgentAdapter::new(config_path(
49 NAME, config,
50 )?)))
51 }
52
53 fn probe_default(&self, env: &Env) -> Option<Value> {
54 let path = env.home.join(".pi").join("agent").join("sessions");
55 path.exists().then(|| json!({ "path": path }))
56 }
57
58 fn serialize(
59 &self,
60 session: &crate::sessions::SessionWithMessages,
61 fidelity: RestoreFidelity,
62 ) -> Result<Vec<RestoredFile>, AdapterError> {
63 serialize_session(session, fidelity)
64 }
65}
66
67fn serialize_session(
68 session: &crate::sessions::SessionWithMessages,
69 fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71 let session_raw = raw_record(&session.session.options);
83 let actual = match fidelity {
84 RestoreFidelity::Native if session_raw.is_some() => RestoreFidelity::Native,
85 _ => RestoreFidelity::Foreign,
86 };
87
88 let mut records = Vec::new();
89 if actual == RestoreFidelity::Native {
90 records.push(session_raw.unwrap_or_else(|| pi_session_record(session)));
91 } else {
92 records.push(pi_session_record(session));
93 }
94
95 let mut messages: Vec<&crate::sessions::MessageWithParts> = session.messages.iter().collect();
98 if actual == RestoreFidelity::Native {
99 messages.sort_by(|left, right| {
100 source_line(left.message.options())
101 .cmp(&source_line(right.message.options()))
102 .then_with(|| by_timestamp_then_id(left, right))
103 });
104 } else {
105 messages.sort_by(|left, right| by_timestamp_then_id(left, right));
106 }
107
108 for message in &messages {
109 if actual == RestoreFidelity::Native
110 && let Some(raw) = raw_record(message.message.options())
111 {
112 records.push(raw);
113 continue;
114 }
115 if matches!(message.message, Message::System { .. }) {
120 continue;
121 }
122 records.push(pi_message_record(message));
123 }
124
125 Ok(vec![RestoredFile::new(
126 pi_relative_path(session),
127 jsonl_bytes(NAME, &records)?,
128 actual,
129 )])
130}
131
132fn pi_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
136 let source = session.session.options.get("source");
137 let slug = source
138 .and_then(|s| s.get("project_slug"))
139 .and_then(Value::as_str)
140 .map(ToOwned::to_owned)
141 .unwrap_or_else(|| encode_project(&session.session.project));
142 let file_name = source
143 .and_then(|s| s.get("file_name"))
144 .and_then(Value::as_str)
145 .map(ToOwned::to_owned)
146 .unwrap_or_else(|| {
147 let ts = session.session.created_at.format("%Y-%m-%dT%H-%M-%S-%3fZ");
148 format!("{ts}_{}.jsonl", session.session.id)
149 });
150 PathBuf::from("sessions").join(slug).join(file_name)
151}
152
153fn encode_project(project: &str) -> String {
154 project.replace(['/', '.'], "-")
155}
156
157fn pi_session_record(session: &crate::sessions::SessionWithMessages) -> Value {
158 json!({
159 "type": "session",
160 "version": 3,
161 "id": session.session.id,
162 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
163 "cwd": &*session.session.project,
164 })
165}
166
167fn pi_message_record(message: &crate::sessions::MessageWithParts) -> Value {
168 json!({
169 "type": "message",
170 "id": message.message.id(),
171 "parentId": message.message.options().get("source").and_then(|s| s.get("parent_id")),
172 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
173 "message": pi_inner_message(message),
174 })
175}
176
177fn pi_inner_message(message: &crate::sessions::MessageWithParts) -> Value {
178 let epoch_ms = message.message.timestamp().timestamp_millis();
179 match &message.message {
180 Message::User { .. } => json!({
181 "role": "user",
182 "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
183 "timestamp": epoch_ms,
184 }),
185 Message::Assistant { .. } => json!({
186 "role": "assistant",
187 "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
188 "timestamp": epoch_ms,
189 }),
190 Message::Tool { .. } => {
191 let part = message.parts.first();
197 let (call_id, name, is_error, result) = match part.map(|p| &p.kind) {
198 Some(PartKind::ToolResult {
199 call_id,
200 name,
201 is_failure,
202 result,
203 }) => (
204 extracted_text(call_id).to_owned(),
205 extracted_text(name).to_owned(),
206 *is_failure,
207 result.clone(),
208 ),
209 _ => (String::new(), String::new(), false, Value::Null),
210 };
211 json!({
212 "role": "toolResult",
213 "toolCallId": call_id,
214 "toolName": name,
215 "content": result,
216 "isError": is_error,
217 "timestamp": epoch_ms,
218 })
219 }
220 Message::System { .. } => {
225 unreachable!("System messages are not serialized through pi_inner_message")
226 }
227 }
228}
229
230fn pi_content_item(part: &Part) -> Value {
231 match &part.kind {
232 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
233 PartKind::Reasoning { text } => json!({
234 "type": "thinking",
235 "thinking": extracted_text(text),
236 "thinkingSignature": part
237 .options
238 .get("pi")
239 .and_then(|p| p.get("thinking_signature")),
240 }),
241 PartKind::ToolCall {
242 call_id,
243 name,
244 params,
245 ..
246 } => json!({
247 "type": "toolCall",
248 "id": extracted_text(call_id),
249 "name": extracted_text(name),
250 "arguments": params,
251 }),
252 other => json!({
253 "type": "text",
254 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
255 }),
256 }
257}
258
259#[derive(Debug, Clone)]
262pub struct PiCodingAgentAdapter {
263 root: PathBuf,
264}
265
266impl PiCodingAgentAdapter {
267 pub fn new(root: impl Into<PathBuf>) -> Self {
268 Self { root: root.into() }
269 }
270}
271
272impl Adapter for PiCodingAgentAdapter {
273 fn discover(&self) -> DiscoverFuture<'_> {
274 jsonl_tree_discover(self)
275 }
276
277 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
278 jsonl_tree_events(self, oracle)
279 }
280}
281
282impl JsonlTree for PiCodingAgentAdapter {
283 type State = ();
286
287 fn name(&self) -> &'static str {
288 NAME
289 }
290
291 fn root(&self) -> &Path {
292 &self.root
293 }
294
295 fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
296 let row: Value = serde_json::from_str(first_line).ok()?;
297 if row.get("type").and_then(Value::as_str) == Some("session") {
298 row.get("id").and_then(Value::as_str).map(ToOwned::to_owned)
299 } else {
300 None
301 }
302 }
303
304 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
305 session_from_rows(path, rows)
306 }
307
308 fn events_from_row(
309 &self,
310 session: &Session,
311 row: &BoundedRow,
312 _state: &mut Self::State,
313 ) -> Result<Vec<IngestEvent>, String> {
314 events_from_row(&session.id, row.line, &row.value, session.created_at)
315 }
316}
317
318fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
319 let path_display = path.display().to_string();
320 let first = rows
321 .first()
322 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
323 let row = &first.value;
324 let at_first = format!("{path_display}:{}", first.line);
325 if row.get("type").and_then(Value::as_str) != Some("session") {
326 return Err(AdapterError::schema(
327 NAME,
328 at_first,
329 "first row must be a `session` record",
330 ));
331 }
332 let id = row
333 .get("id")
334 .and_then(Value::as_str)
335 .ok_or_else(|| AdapterError::schema(NAME, at_first.clone(), "session record missing id"))?
336 .to_owned();
337 let created_at = row
338 .get("timestamp")
339 .and_then(Value::as_str)
340 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
341 .map(|dt| dt.with_timezone(&Utc))
342 .ok_or_else(|| {
343 AdapterError::schema(
344 NAME,
345 at_first.clone(),
346 "session record has no parseable timestamp",
347 )
348 })?;
349 let project = extract_str(row, "cwd").ok_or_else(|| {
350 AdapterError::schema(NAME, at_first, "session record missing cwd")
353 })?;
354
355 let project_slug = path
359 .parent()
360 .and_then(|p| p.file_name())
361 .and_then(|n| n.to_str())
362 .map(ToOwned::to_owned);
363 let file_name = path
364 .file_name()
365 .and_then(|n| n.to_str())
366 .map(ToOwned::to_owned);
367
368 let mut options = ProviderOptions::new();
369 options.insert(
370 "source".to_owned(),
371 json!({
372 "adapter": NAME,
373 "version": row.get("version"),
374 "project_slug": project_slug,
375 "file_name": file_name,
376 "raw_record": extract_raw_record(row),
377 }),
378 );
379
380 Ok(Session {
381 id,
382 parent_session_id: None,
383 parent_message_id: None,
384 source_agent: NAME.to_owned(),
385 created_at,
386 project,
387 options,
388 })
389}
390
391fn events_from_row(
396 session_id: &str,
397 line: usize,
398 row: &Value,
399 default_timestamp: DateTime<Utc>,
400) -> Result<Vec<IngestEvent>, String> {
401 let kind = row.get("type").and_then(Value::as_str);
402 let timestamp = row
403 .get("timestamp")
404 .and_then(Value::as_str)
405 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
406 .map(|dt| dt.with_timezone(&Utc))
407 .unwrap_or(default_timestamp);
408 let id = row
409 .get("id")
410 .and_then(Value::as_str)
411 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
412
413 match kind {
414 Some("session") => Ok(Vec::new()),
416 Some("message") => {
417 let message_value = row
418 .get("message")
419 .ok_or_else(|| "message record missing `message` field".to_owned())?;
420 message_events(session_id, &id, timestamp, row, message_value, line)
421 }
422 Some("compaction") => Ok(vec![carrier_event(
425 session_id,
426 &id,
427 timestamp,
428 row,
429 line,
430 extract_str(row, "summary"),
431 )]),
432 Some("model_change") | Some("thinking_level_change") => Ok(vec![carrier_event(
433 session_id,
434 &id,
435 timestamp,
436 row,
437 line,
438 extract_str(row, "type"),
439 )]),
440 _ => Ok(vec![carrier_event(
444 session_id,
445 &id,
446 timestamp,
447 row,
448 line,
449 extract_str(row, "type"),
450 )]),
451 }
452}
453
454fn carrier_event(
455 session_id: &str,
456 id: &str,
457 timestamp: DateTime<Utc>,
458 row: &Value,
459 line: usize,
460 content: Option<Extracted<String>>,
461) -> IngestEvent {
462 IngestEvent::Message(Message::System {
463 id: id.to_owned(),
464 session_id: session_id.to_owned(),
465 timestamp,
466 content,
467 options: row_options(row, line),
468 })
469}
470
471fn message_events(
472 session_id: &str,
473 id: &str,
474 timestamp: DateTime<Utc>,
475 row: &Value,
476 message_value: &Value,
477 line: usize,
478) -> Result<Vec<IngestEvent>, String> {
479 let role = message_value
480 .get("role")
481 .and_then(Value::as_str)
482 .ok_or_else(|| "message missing role".to_owned())?;
483 let content = message_value
484 .get("content")
485 .and_then(Value::as_array)
486 .cloned()
487 .unwrap_or_default();
488
489 let mut parts = Vec::new();
490 let message = match role {
491 "user" => {
492 for (ordinal, item) in content.iter().enumerate() {
496 parts.push(user_part(session_id, id, ordinal, item));
497 }
498 Message::User {
499 id: id.to_owned(),
500 session_id: session_id.to_owned(),
501 timestamp,
502 options: row_options(row, line),
503 }
504 }
505 "assistant" => {
506 for (ordinal, item) in content.iter().enumerate() {
507 parts.push(assistant_part(session_id, id, ordinal, item));
508 }
509 Message::Assistant {
510 id: id.to_owned(),
511 session_id: session_id.to_owned(),
512 timestamp,
513 options: assistant_options(row, message_value, line),
514 }
515 }
516 "toolResult" => {
517 parts.push(tool_result_part(session_id, id, message_value));
518 Message::Tool {
519 id: id.to_owned(),
520 session_id: session_id.to_owned(),
521 timestamp,
522 options: row_options(row, line),
523 }
524 }
525 _ => Message::System {
528 id: id.to_owned(),
529 session_id: session_id.to_owned(),
530 timestamp,
531 content: extract_str(message_value, "role"),
532 options: row_options(row, line),
533 },
534 };
535
536 let mut events = Vec::with_capacity(parts.len() + 1);
537 events.push(IngestEvent::Message(message));
538 events.extend(parts.into_iter().map(IngestEvent::Part));
539 Ok(events)
540}
541
542fn user_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
543 let kind = match item.get("type").and_then(Value::as_str) {
544 Some("text") => PartKind::Text {
545 text: extract_str(item, "text"),
546 },
547 _ => PartKind::Text {
550 text: Some(extract_compact_repr(item)),
551 },
552 };
553 Part {
554 session_id: session_id.to_owned(),
555 id: part_id(message_id, ordinal),
556 message_id: message_id.to_owned(),
557 ordinal: part_ordinal(ordinal),
558 provenance: Provenance::Conversational,
560 options: empty_options(),
561 kind,
562 }
563}
564
565fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
566 let (kind, options) = match item.get("type").and_then(Value::as_str) {
569 Some("text") => (
570 PartKind::Text {
571 text: extract_str(item, "text"),
572 },
573 empty_options(),
574 ),
575 Some("thinking") => (
576 PartKind::Reasoning {
577 text: extract_str(item, "thinking"),
578 },
579 thinking_options(item),
580 ),
581 Some("toolCall") => (
582 PartKind::ToolCall {
583 call_id: extract_str(item, "id"),
584 name: extract_str(item, "name"),
585 params: item.get("arguments").cloned().unwrap_or(Value::Null),
586 provider_executed: false,
587 },
588 empty_options(),
589 ),
590 _ => (
592 PartKind::Text {
593 text: Some(extract_compact_repr(item)),
594 },
595 empty_options(),
596 ),
597 };
598 Part {
599 session_id: session_id.to_owned(),
600 id: part_id(message_id, ordinal),
601 message_id: message_id.to_owned(),
602 ordinal: part_ordinal(ordinal),
603 provenance: Provenance::Conversational,
604 options,
605 kind,
606 }
607}
608
609fn tool_result_part(session_id: &str, message_id: &str, message_value: &Value) -> Part {
610 Part {
611 session_id: session_id.to_owned(),
612 id: part_id(message_id, 0),
613 message_id: message_id.to_owned(),
614 ordinal: 0,
615 provenance: Provenance::Injected,
617 options: empty_options(),
618 kind: PartKind::ToolResult {
619 call_id: extract_str(message_value, "toolCallId"),
620 name: extract_str(message_value, "toolName"),
621 is_failure: message_value
622 .get("isError")
623 .and_then(Value::as_bool)
624 .unwrap_or(false),
625 result: message_value.get("content").cloned().unwrap_or(Value::Null),
628 },
629 }
630}
631
632fn row_options(row: &Value, line: usize) -> ProviderOptions {
633 let mut options = ProviderOptions::new();
634 options.insert(
635 "source".to_owned(),
636 json!({
637 "line": line,
638 "parent_id": row.get("parentId"),
639 "raw_type": row.get("type"),
640 "raw_record": extract_raw_record(row),
641 }),
642 );
643 options
644}
645
646fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
647 let mut options = row_options(row, line);
648 options.insert(
649 "pi".to_owned(),
650 json!({
651 "api": message_value.get("api"),
652 "provider": message_value.get("provider"),
653 "model": message_value.get("model"),
654 "usage": message_value.get("usage"),
655 "stop_reason": message_value.get("stopReason"),
656 "response_id": message_value.get("responseId"),
657 }),
658 );
659 options
660}
661
662fn thinking_options(item: &Value) -> ProviderOptions {
663 let mut options = ProviderOptions::new();
664 if let Some(signature) = item.get("thinkingSignature") {
665 options.insert("pi".to_owned(), json!({ "thinking_signature": signature }));
666 }
667 options
668}
669
670#[cfg(test)]
671mod tests {
672 #![allow(clippy::expect_used, clippy::unwrap_used)]
676
677 use super::*;
678 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
679 use tempfile::TempDir;
680
681 const FIXTURES: &str = "tests/fixtures/adapter/pi-coding-agent/sessions";
682
683 #[test]
684 fn probe_default_finds_pi_sessions_under_home() -> anyhow::Result<()> {
685 crate::adapter::test_support::assert_probe_default(
686 &PiCodingAgentFactory,
687 &[".pi", "agent", "sessions"],
688 )
689 }
690
691 #[tokio::test(flavor = "multi_thread")]
692 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
693 let adapter = PiCodingAgentAdapter::new(FIXTURES);
694 crate::adapter::test_support::assert_native_restore(
695 &PiCodingAgentFactory,
696 &adapter,
697 std::path::Path::new(FIXTURES)
700 .parent()
701 .expect("FIXTURES is nested under a corpus root"),
702 )
703 .await
704 }
705
706 #[tokio::test(flavor = "multi_thread")]
707 async fn pi_coding_agent_adapter_ingests_fixture_corpus_into_canonical_shape()
708 -> anyhow::Result<()> {
709 let temp = TempDir::new()?;
710 let store = Store::open_local(temp.path()).await?;
711 let adapter = PiCodingAgentAdapter::new(FIXTURES);
712
713 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
714 assert!(summary.accepted() > 0, "ingest must accept rows");
715 assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
716 assert_eq!(
717 summary.dropped_sessions, 0,
718 "no session-level rejections expected"
719 );
720 assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
721
722 let (sessions, messages, parts) = store.row_counts().await?;
723 assert!(sessions > 0, "at least one pi-coding-agent session");
724 assert!(messages > 0, "at least one pi-coding-agent message");
725 assert!(parts > 0, "at least one pi-coding-agent Part");
726
727 let mut saw_tool_call = false;
728 let mut saw_tool_result = false;
729 let mut saw_reasoning = false;
730 for session_id in store.session_ids().await? {
731 let session = store
732 .get_session(&session_id)
733 .await?
734 .expect("session round-trips");
735 assert_eq!(session.session.source_agent, NAME);
736 assert!(
737 !(*session.session.project).is_empty(),
738 "spec.md#model-project-non-empty: project must be a real cwd",
739 );
740 for stored in &session.messages {
741 for part in &stored.parts {
742 match &part.kind {
743 PartKind::ToolCall { .. } => saw_tool_call = true,
744 PartKind::ToolResult { .. } => saw_tool_result = true,
745 PartKind::Reasoning { .. } => saw_reasoning = true,
746 _ => {}
747 }
748 }
749 }
750 }
751 assert!(saw_tool_call, "corpus has assistant tool calls");
752 assert!(saw_tool_result, "corpus has tool results");
753 assert!(saw_reasoning, "corpus has assistant reasoning");
754 Ok(())
755 }
756
757 #[test]
758 fn unknown_nested_message_role_becomes_system_carrier() -> anyhow::Result<()> {
759 let row = json!({
760 "type": "message",
761 "id": "mystery-message",
762 "message": {
763 "role": "mysteryRole",
764 "content": [{"type": "text", "text": "not yet understood"}]
765 }
766 });
767 let events = events_from_row(
768 "session-1",
769 42,
770 &row,
771 DateTime::parse_from_rfc3339("2026-04-28T18:47:32.280Z")?.with_timezone(&Utc),
772 )
773 .map_err(anyhow::Error::msg)?;
774
775 assert_eq!(events.len(), 1);
776 let IngestEvent::Message(Message::System {
777 id,
778 content,
779 options,
780 ..
781 }) = &events[0]
782 else {
783 panic!("unknown role must produce a System carrier");
784 };
785 assert_eq!(id, "mystery-message");
786 assert_eq!(content.as_deref().map(String::as_str), Some("mysteryRole"));
787 assert_eq!(
788 raw_record(options)
789 .and_then(|raw| raw.get("message").cloned())
790 .and_then(|message| message.get("role").cloned()),
791 Some(json!("mysteryRole")),
792 );
793 Ok(())
794 }
795
796 #[tokio::test(flavor = "multi_thread")]
797 async fn fork_parent_ids_and_compaction_summary_are_preserved() -> anyhow::Result<()> {
798 let temp = TempDir::new()?;
799 let root = temp.path().join("sessions");
800 let path = root
801 .join("project")
802 .join("2026-05-01T00-00-00-000Z_fork.jsonl");
803 write_jsonl_file(
804 &path,
805 &[
806 json!({
807 "type": "session",
808 "version": 3,
809 "id": "pi-fork-session",
810 "timestamp": "2026-05-01T00:00:00.000Z",
811 "cwd": "/tmp/pi-fork",
812 }),
813 json!({
814 "type": "message",
815 "id": "parent-message",
816 "timestamp": "2026-05-01T00:00:01.000Z",
817 "message": {
818 "role": "user",
819 "content": [{"type": "text", "text": "parent"}],
820 },
821 }),
822 json!({
823 "type": "message",
824 "id": "child-a",
825 "parentId": "parent-message",
826 "timestamp": "2026-05-01T00:00:02.000Z",
827 "message": {
828 "role": "assistant",
829 "content": [{"type": "text", "text": "branch a"}],
830 },
831 }),
832 json!({
833 "type": "message",
834 "id": "child-b",
835 "parentId": "parent-message",
836 "timestamp": "2026-05-01T00:00:03.000Z",
837 "message": {
838 "role": "assistant",
839 "content": [{"type": "text", "text": "branch b"}],
840 },
841 }),
842 json!({
843 "type": "compaction",
844 "id": "compact-1",
845 "parentId": "child-b",
846 "timestamp": "2026-05-01T00:00:04.000Z",
847 "summary": "compact summary",
848 }),
849 ],
850 )?;
851
852 let store = Store::open_local(temp.path().join("store")).await?;
853 let summary = ingest_adapter(
854 &store,
855 &PiCodingAgentAdapter::new(&root),
856 &crate::adapter::NoopOracle,
857 |_| {},
858 )
859 .await?;
860 assert_eq!(summary.dropped_events, 0);
861
862 let session = store
863 .get_session("pi-fork-session")
864 .await?
865 .expect("fixture session lands");
866 let child_a = session
867 .messages
868 .iter()
869 .find(|stored| stored.message.id() == "child-a")
870 .expect("first fork child lands");
871 let child_b = session
872 .messages
873 .iter()
874 .find(|stored| stored.message.id() == "child-b")
875 .expect("second fork child lands");
876 for child in [child_a, child_b] {
877 assert_eq!(
878 child
879 .message
880 .options()
881 .get("source")
882 .and_then(|source| source.get("parent_id"))
883 .and_then(Value::as_str),
884 Some("parent-message"),
885 );
886 }
887 assert!(source_line(child_a.message.options()) < source_line(child_b.message.options()));
888
889 let compact = session
890 .messages
891 .iter()
892 .find(|stored| stored.message.id() == "compact-1")
893 .expect("compaction carrier lands");
894 let Message::System { content, .. } = &compact.message else {
895 panic!("compaction is preserved as a System carrier");
896 };
897 assert_eq!(
898 content.as_deref().map(String::as_str),
899 Some("compact summary")
900 );
901 Ok(())
902 }
903
904 #[tokio::test(flavor = "multi_thread")]
905 async fn foreign_serialization_reparses_as_pi_coding_agent() -> anyhow::Result<()> {
906 let temp = TempDir::new()?;
907 let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
908 let origin =
909 crate::adapter::OpencodeAdapter::new("tests/fixtures/adapter/opencode/storage");
910 ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
911 let session_id = origin_store
912 .session_ids()
913 .await?
914 .into_iter()
915 .next()
916 .expect("opencode fixture has sessions");
917 let session = origin_store
918 .get_session(&session_id)
919 .await?
920 .expect("fixture session is readable");
921
922 let restored_root = temp.path().join("pi-corpus");
923 crate::adapter::write_restored_files(
924 &restored_root,
925 PiCodingAgentFactory.serialize(&session, RestoreFidelity::Foreign)?,
926 )?;
927 let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
928 let summary = ingest_adapter(
929 &restored_store,
930 &PiCodingAgentAdapter::new(restored_root.join("sessions")),
931 &crate::adapter::NoopOracle,
932 |_| {},
933 )
934 .await?;
935
936 assert!(summary.accepted() > 0);
937 assert_eq!(summary.dropped_events, 0);
938 Ok(())
939 }
940
941 #[tokio::test(flavor = "multi_thread")]
944 async fn tool_results_are_injected_assistant_parts_are_conversational() -> anyhow::Result<()> {
945 let temp = TempDir::new()?;
946 let store = Store::open_local(temp.path()).await?;
947 let adapter = PiCodingAgentAdapter::new(FIXTURES);
948 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
949
950 for session_id in store.session_ids().await? {
951 let session = store
952 .get_session(&session_id)
953 .await?
954 .expect("session round-trips");
955 for stored in &session.messages {
956 for part in &stored.parts {
957 match &part.kind {
958 PartKind::ToolResult { .. } => {
959 assert_eq!(part.provenance, Provenance::Injected);
960 }
961 PartKind::ToolCall { .. } | PartKind::Reasoning { .. } => {
962 assert_eq!(part.provenance, Provenance::Conversational);
963 }
964 _ => {}
965 }
966 }
967 }
968 }
969 Ok(())
970 }
971
972 fn write_jsonl_file(path: &std::path::Path, records: &[Value]) -> anyhow::Result<()> {
973 if let Some(parent) = path.parent() {
974 std::fs::create_dir_all(parent)?;
975 }
976 std::fs::write(path, jsonl_bytes(NAME, records)?)?;
977 Ok(())
978 }
979}