1use std::path::{Path, PathBuf};
17
18use chrono::{DateTime, SecondsFormat, Utc};
19use serde_json::{Value, json};
20
21use crate::{
22 sessions::IngestEvent,
23 wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
24};
25
26use super::{
27 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
28 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
29 empty_options,
30 extract::{Extracted, extract_compact_repr, extract_raw_record, extract_str},
31 extracted_text,
32 jsonl::{
33 BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, peek_last_line, source_line,
34 },
35 jsonl_bytes, part_id, part_ordinal, raw_record,
36};
37
38const NAME: &str = "pi-coding-agent";
39
40pub struct PiCodingAgentFactory;
43
44impl AdapterFactory for PiCodingAgentFactory {
45 fn name(&self) -> &'static str {
46 NAME
47 }
48
49 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
50 Ok(Box::new(PiCodingAgentAdapter::new(config_path(
51 NAME, config,
52 )?)))
53 }
54
55 fn probe_default(&self, env: &Env) -> Option<Value> {
56 let path = env.home.join(".pi").join("agent").join("sessions");
57 path.exists().then(|| json!({ "path": path }))
58 }
59
60 fn serialize(
61 &self,
62 session: &crate::sessions::SessionWithMessages,
63 fidelity: RestoreFidelity,
64 ) -> Result<Vec<RestoredFile>, AdapterError> {
65 serialize_session(session, fidelity)
66 }
67}
68
69fn serialize_session(
70 session: &crate::sessions::SessionWithMessages,
71 fidelity: RestoreFidelity,
72) -> Result<Vec<RestoredFile>, AdapterError> {
73 let session_raw = raw_record(&session.session.options);
85 let actual = match fidelity {
86 RestoreFidelity::Native if session_raw.is_some() => RestoreFidelity::Native,
87 _ => RestoreFidelity::Foreign,
88 };
89
90 let mut records = Vec::new();
91 if actual == RestoreFidelity::Native {
92 records.push(session_raw.unwrap_or_else(|| pi_session_record(session)));
93 } else {
94 records.push(pi_session_record(session));
95 }
96
97 let mut messages: Vec<&crate::sessions::MessageWithParts> = session.messages.iter().collect();
100 if actual == RestoreFidelity::Native {
101 messages.sort_by(|left, right| {
102 source_line(left.message.options())
103 .cmp(&source_line(right.message.options()))
104 .then_with(|| by_timestamp_then_id(left, right))
105 });
106 } else {
107 messages.sort_by(|left, right| by_timestamp_then_id(left, right));
108 }
109
110 for message in &messages {
111 if actual == RestoreFidelity::Native
112 && let Some(raw) = raw_record(message.message.options())
113 {
114 records.push(raw);
115 continue;
116 }
117 if matches!(message.message, Message::System { .. }) {
122 continue;
123 }
124 records.push(pi_message_record(message));
125 }
126
127 Ok(vec![RestoredFile::new(
128 pi_relative_path(session),
129 jsonl_bytes(NAME, &records)?,
130 actual,
131 )])
132}
133
134fn pi_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
138 let source = session.session.options.get("source");
139 let slug = source
140 .and_then(|s| s.get("project_slug"))
141 .and_then(Value::as_str)
142 .map(ToOwned::to_owned)
143 .unwrap_or_else(|| encode_project(&session.session.project));
144 let file_name = source
145 .and_then(|s| s.get("file_name"))
146 .and_then(Value::as_str)
147 .map(ToOwned::to_owned)
148 .unwrap_or_else(|| {
149 let ts = session.session.created_at.format("%Y-%m-%dT%H-%M-%S-%3fZ");
150 format!("{ts}_{}.jsonl", session.session.id)
151 });
152 PathBuf::from("sessions").join(slug).join(file_name)
153}
154
155fn encode_project(project: &str) -> String {
156 project.replace(['/', '.'], "-")
157}
158
159fn pi_session_record(session: &crate::sessions::SessionWithMessages) -> Value {
160 json!({
161 "type": "session",
162 "version": 3,
163 "id": session.session.id,
164 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
165 "cwd": &*session.session.project,
166 })
167}
168
169fn pi_message_record(message: &crate::sessions::MessageWithParts) -> Value {
170 json!({
171 "type": "message",
172 "id": message.message.id(),
173 "parentId": message.message.options().get("source").and_then(|s| s.get("parent_id")),
174 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
175 "message": pi_inner_message(message),
176 })
177}
178
179fn pi_inner_message(message: &crate::sessions::MessageWithParts) -> Value {
180 let epoch_ms = message.message.timestamp().timestamp_millis();
181 match &message.message {
182 Message::User { .. } => json!({
183 "role": "user",
184 "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
185 "timestamp": epoch_ms,
186 }),
187 Message::Assistant { .. } => json!({
188 "role": "assistant",
189 "content": message.parts.iter().map(pi_content_item).collect::<Vec<_>>(),
190 "timestamp": epoch_ms,
191 }),
192 Message::Tool { .. } => {
193 let part = message.parts.first();
199 let (call_id, name, is_error, result) = match part.map(|p| &p.kind) {
200 Some(PartKind::ToolResult {
201 call_id,
202 name,
203 is_failure,
204 result,
205 }) => (
206 extracted_text(call_id).to_owned(),
207 extracted_text(name).to_owned(),
208 *is_failure,
209 result.clone(),
210 ),
211 _ => (String::new(), String::new(), false, Value::Null),
212 };
213 json!({
214 "role": "toolResult",
215 "toolCallId": call_id,
216 "toolName": name,
217 "content": result,
218 "isError": is_error,
219 "timestamp": epoch_ms,
220 })
221 }
222 Message::System { .. } => {
227 unreachable!("System messages are not serialized through pi_inner_message")
228 }
229 }
230}
231
232fn pi_content_item(part: &Part) -> Value {
233 match &part.kind {
234 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
235 PartKind::Reasoning { text } => json!({
236 "type": "thinking",
237 "thinking": extracted_text(text),
238 "thinkingSignature": part
239 .options
240 .get("pi")
241 .and_then(|p| p.get("thinking_signature")),
242 }),
243 PartKind::ToolCall {
244 call_id,
245 name,
246 params,
247 ..
248 } => json!({
249 "type": "toolCall",
250 "id": extracted_text(call_id),
251 "name": extracted_text(name),
252 "arguments": params,
253 }),
254 other => json!({
255 "type": "text",
256 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
257 }),
258 }
259}
260
261#[derive(Debug, Clone)]
264pub struct PiCodingAgentAdapter {
265 root: PathBuf,
266}
267
268impl PiCodingAgentAdapter {
269 pub fn new(root: impl Into<PathBuf>) -> Self {
270 Self { root: root.into() }
271 }
272}
273
274impl Adapter for PiCodingAgentAdapter {
275 fn discover(&self) -> DiscoverFuture<'_> {
276 jsonl_tree_discover(self)
277 }
278
279 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
280 jsonl_tree_events(self, oracle)
281 }
282}
283
284impl JsonlTree for PiCodingAgentAdapter {
285 type State = ();
288
289 fn name(&self) -> &'static str {
290 NAME
291 }
292
293 fn root(&self) -> &Path {
294 &self.root
295 }
296
297 fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
298 let row: Value = serde_json::from_str(first_line).ok()?;
299 if row.get("type").and_then(Value::as_str) == Some("session") {
300 row.get("id").and_then(Value::as_str).map(ToOwned::to_owned)
301 } else {
302 None
303 }
304 }
305
306 fn peek_last_ts(&self, path: &Path) -> Option<i64> {
307 let row: Value = serde_json::from_str(&peek_last_line(path)?).ok()?;
312 if row.get("type").and_then(Value::as_str) == Some("session") {
313 return None;
314 }
315 let text = row.get("timestamp").and_then(Value::as_str)?;
316 Some(
317 DateTime::parse_from_rfc3339(text)
318 .ok()?
319 .with_timezone(&Utc)
320 .timestamp_micros(),
321 )
322 }
323
324 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
325 session_from_rows(path, rows)
326 }
327
328 fn events_from_row(
329 &self,
330 session: &Session,
331 row: &BoundedRow,
332 _state: &mut Self::State,
333 ) -> Result<Vec<IngestEvent>, String> {
334 events_from_row(&session.id, row.line, &row.value, session.created_at)
335 }
336}
337
338fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
339 let path_display = path.display().to_string();
340 let first = rows
341 .first()
342 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
343 let row = &first.value;
344 let at_first = format!("{path_display}:{}", first.line);
345 if row.get("type").and_then(Value::as_str) != Some("session") {
346 return Err(AdapterError::schema(
347 NAME,
348 at_first,
349 "first row must be a `session` record",
350 ));
351 }
352 let id = row
353 .get("id")
354 .and_then(Value::as_str)
355 .ok_or_else(|| AdapterError::schema(NAME, at_first.clone(), "session record missing id"))?
356 .to_owned();
357 let created_at = row
358 .get("timestamp")
359 .and_then(Value::as_str)
360 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
361 .map(|dt| dt.with_timezone(&Utc))
362 .ok_or_else(|| {
363 AdapterError::schema(
364 NAME,
365 at_first.clone(),
366 "session record has no parseable timestamp",
367 )
368 })?;
369 let project = extract_str(row, "cwd").ok_or_else(|| {
370 AdapterError::schema(NAME, at_first, "session record missing cwd")
373 })?;
374
375 let project_slug = path
379 .parent()
380 .and_then(|p| p.file_name())
381 .and_then(|n| n.to_str())
382 .map(ToOwned::to_owned);
383 let file_name = path
384 .file_name()
385 .and_then(|n| n.to_str())
386 .map(ToOwned::to_owned);
387
388 let mut options = ProviderOptions::new();
389 options.insert(
390 "source".to_owned(),
391 json!({
392 "adapter": NAME,
393 "version": row.get("version"),
394 "project_slug": project_slug,
395 "file_name": file_name,
396 "raw_record": extract_raw_record(row),
397 }),
398 );
399
400 Ok(Session {
401 id,
402 parent_session_id: None,
403 parent_message_id: None,
404 source_agent: NAME.to_owned(),
405 created_at,
406 project,
407 options,
408 })
409}
410
411fn events_from_row(
416 session_id: &str,
417 line: usize,
418 row: &Value,
419 default_timestamp: DateTime<Utc>,
420) -> Result<Vec<IngestEvent>, String> {
421 let kind = row.get("type").and_then(Value::as_str);
422 let timestamp = row
423 .get("timestamp")
424 .and_then(Value::as_str)
425 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
426 .map(|dt| dt.with_timezone(&Utc))
427 .unwrap_or(default_timestamp);
428 let id = row
429 .get("id")
430 .and_then(Value::as_str)
431 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
432
433 match kind {
434 Some("session") => Ok(Vec::new()),
436 Some("message") => {
437 let message_value = row
438 .get("message")
439 .ok_or_else(|| "message record missing `message` field".to_owned())?;
440 message_events(session_id, &id, timestamp, row, message_value, line)
441 }
442 Some("compaction") => Ok(vec![carrier_event(
445 session_id,
446 &id,
447 timestamp,
448 row,
449 line,
450 extract_str(row, "summary"),
451 )]),
452 Some("model_change") | Some("thinking_level_change") => Ok(vec![carrier_event(
453 session_id,
454 &id,
455 timestamp,
456 row,
457 line,
458 extract_str(row, "type"),
459 )]),
460 _ => Ok(vec![carrier_event(
464 session_id,
465 &id,
466 timestamp,
467 row,
468 line,
469 extract_str(row, "type"),
470 )]),
471 }
472}
473
474fn carrier_event(
475 session_id: &str,
476 id: &str,
477 timestamp: DateTime<Utc>,
478 row: &Value,
479 line: usize,
480 content: Option<Extracted<String>>,
481) -> IngestEvent {
482 IngestEvent::Message(Message::System {
483 id: id.to_owned(),
484 session_id: session_id.to_owned(),
485 timestamp,
486 content,
487 options: row_options(row, line),
488 })
489}
490
491fn message_events(
492 session_id: &str,
493 id: &str,
494 timestamp: DateTime<Utc>,
495 row: &Value,
496 message_value: &Value,
497 line: usize,
498) -> Result<Vec<IngestEvent>, String> {
499 let role = message_value
500 .get("role")
501 .and_then(Value::as_str)
502 .ok_or_else(|| "message missing role".to_owned())?;
503 let content = message_value
504 .get("content")
505 .and_then(Value::as_array)
506 .cloned()
507 .unwrap_or_default();
508
509 let mut parts = Vec::new();
510 let message = match role {
511 "user" => {
512 for (ordinal, item) in content.iter().enumerate() {
516 parts.push(user_part(session_id, id, ordinal, item));
517 }
518 Message::User {
519 id: id.to_owned(),
520 session_id: session_id.to_owned(),
521 timestamp,
522 options: row_options(row, line),
523 }
524 }
525 "assistant" => {
526 for (ordinal, item) in content.iter().enumerate() {
527 parts.push(assistant_part(session_id, id, ordinal, item));
528 }
529 Message::Assistant {
530 id: id.to_owned(),
531 session_id: session_id.to_owned(),
532 timestamp,
533 options: assistant_options(row, message_value, line),
534 }
535 }
536 "toolResult" => {
537 parts.push(tool_result_part(session_id, id, message_value));
538 Message::Tool {
539 id: id.to_owned(),
540 session_id: session_id.to_owned(),
541 timestamp,
542 options: row_options(row, line),
543 }
544 }
545 _ => Message::System {
548 id: id.to_owned(),
549 session_id: session_id.to_owned(),
550 timestamp,
551 content: extract_str(message_value, "role"),
552 options: row_options(row, line),
553 },
554 };
555
556 let mut events = Vec::with_capacity(parts.len() + 1);
557 events.push(IngestEvent::Message(message));
558 events.extend(parts.into_iter().map(IngestEvent::Part));
559 Ok(events)
560}
561
562fn user_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
563 let kind = match item.get("type").and_then(Value::as_str) {
564 Some("text") => PartKind::Text {
565 text: extract_str(item, "text"),
566 },
567 _ => PartKind::Text {
570 text: Some(extract_compact_repr(item)),
571 },
572 };
573 Part {
574 session_id: session_id.to_owned(),
575 id: part_id(message_id, ordinal),
576 message_id: message_id.to_owned(),
577 ordinal: part_ordinal(ordinal),
578 provenance: Provenance::Conversational,
580 options: empty_options(),
581 kind,
582 }
583}
584
585fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, item: &Value) -> Part {
586 let (kind, options) = match item.get("type").and_then(Value::as_str) {
589 Some("text") => (
590 PartKind::Text {
591 text: extract_str(item, "text"),
592 },
593 empty_options(),
594 ),
595 Some("thinking") => (
596 PartKind::Reasoning {
597 text: extract_str(item, "thinking"),
598 },
599 thinking_options(item),
600 ),
601 Some("toolCall") => (
602 PartKind::ToolCall {
603 call_id: extract_str(item, "id"),
604 name: extract_str(item, "name"),
605 params: item.get("arguments").cloned().unwrap_or(Value::Null),
606 provider_executed: false,
607 },
608 empty_options(),
609 ),
610 _ => (
612 PartKind::Text {
613 text: Some(extract_compact_repr(item)),
614 },
615 empty_options(),
616 ),
617 };
618 Part {
619 session_id: session_id.to_owned(),
620 id: part_id(message_id, ordinal),
621 message_id: message_id.to_owned(),
622 ordinal: part_ordinal(ordinal),
623 provenance: Provenance::Conversational,
624 options,
625 kind,
626 }
627}
628
629fn tool_result_part(session_id: &str, message_id: &str, message_value: &Value) -> Part {
630 Part {
631 session_id: session_id.to_owned(),
632 id: part_id(message_id, 0),
633 message_id: message_id.to_owned(),
634 ordinal: 0,
635 provenance: Provenance::Injected,
637 options: empty_options(),
638 kind: PartKind::ToolResult {
639 call_id: extract_str(message_value, "toolCallId"),
640 name: extract_str(message_value, "toolName"),
641 is_failure: message_value
642 .get("isError")
643 .and_then(Value::as_bool)
644 .unwrap_or(false),
645 result: message_value.get("content").cloned().unwrap_or(Value::Null),
648 },
649 }
650}
651
652fn row_options(row: &Value, line: usize) -> ProviderOptions {
653 let mut options = ProviderOptions::new();
654 options.insert(
655 "source".to_owned(),
656 json!({
657 "line": line,
658 "parent_id": row.get("parentId"),
659 "raw_type": row.get("type"),
660 "raw_record": extract_raw_record(row),
661 }),
662 );
663 options
664}
665
666fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
667 let mut options = row_options(row, line);
668 options.insert(
669 "pi".to_owned(),
670 json!({
671 "api": message_value.get("api"),
672 "provider": message_value.get("provider"),
673 "model": message_value.get("model"),
674 "usage": message_value.get("usage"),
675 "stop_reason": message_value.get("stopReason"),
676 "response_id": message_value.get("responseId"),
677 }),
678 );
679 options
680}
681
682fn thinking_options(item: &Value) -> ProviderOptions {
683 let mut options = ProviderOptions::new();
684 if let Some(signature) = item.get("thinkingSignature") {
685 options.insert("pi".to_owned(), json!({ "thinking_signature": signature }));
686 }
687 options
688}
689
690#[cfg(test)]
691mod tests {
692 #![allow(clippy::expect_used, clippy::unwrap_used)]
696
697 use super::*;
698 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
699 use tempfile::TempDir;
700
701 const FIXTURES: &str = concat!(
704 env!("CARGO_MANIFEST_DIR"),
705 "/tests/fixtures/adapter/pi-coding-agent/sessions"
706 );
707
708 #[test]
709 fn probe_default_finds_pi_sessions_under_home() -> anyhow::Result<()> {
710 crate::adapter::test_support::assert_probe_default(
711 &PiCodingAgentFactory,
712 &[".pi", "agent", "sessions"],
713 )
714 }
715
716 #[tokio::test(flavor = "multi_thread")]
717 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
718 let adapter = PiCodingAgentAdapter::new(FIXTURES);
719 crate::adapter::test_support::assert_native_restore(
720 &PiCodingAgentFactory,
721 &adapter,
722 std::path::Path::new(FIXTURES)
725 .parent()
726 .expect("FIXTURES is nested under a corpus root"),
727 )
728 .await
729 }
730
731 #[tokio::test(flavor = "multi_thread")]
732 async fn pi_coding_agent_adapter_ingests_fixture_corpus_into_canonical_shape()
733 -> anyhow::Result<()> {
734 let temp = TempDir::new()?;
735 let store = Store::open_local(temp.path()).await?;
736 let adapter = PiCodingAgentAdapter::new(FIXTURES);
737
738 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
739 assert!(summary.accepted() > 0, "ingest must accept rows");
740 assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
741 assert_eq!(
742 summary.dropped_sessions, 0,
743 "no session-level rejections expected"
744 );
745 assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
746
747 let (sessions, messages, parts) = store.row_counts().await?;
748 assert!(sessions > 0, "at least one pi-coding-agent session");
749 assert!(messages > 0, "at least one pi-coding-agent message");
750 assert!(parts > 0, "at least one pi-coding-agent Part");
751
752 let mut saw_tool_call = false;
753 let mut saw_tool_result = false;
754 let mut saw_reasoning = false;
755 for session_id in store.session_ids().await? {
756 let session = store
757 .get_session(&session_id)
758 .await?
759 .expect("session round-trips");
760 assert_eq!(session.session.source_agent, NAME);
761 assert!(
762 !(*session.session.project).is_empty(),
763 "spec.md#model-project-non-empty: project must be a real cwd",
764 );
765 for stored in &session.messages {
766 for part in &stored.parts {
767 match &part.kind {
768 PartKind::ToolCall { .. } => saw_tool_call = true,
769 PartKind::ToolResult { .. } => saw_tool_result = true,
770 PartKind::Reasoning { .. } => saw_reasoning = true,
771 _ => {}
772 }
773 }
774 }
775 }
776 assert!(saw_tool_call, "corpus has assistant tool calls");
777 assert!(saw_tool_result, "corpus has tool results");
778 assert!(saw_reasoning, "corpus has assistant reasoning");
779 Ok(())
780 }
781
782 #[test]
783 fn unknown_nested_message_role_becomes_system_carrier() -> anyhow::Result<()> {
784 let row = json!({
785 "type": "message",
786 "id": "mystery-message",
787 "message": {
788 "role": "mysteryRole",
789 "content": [{"type": "text", "text": "not yet understood"}]
790 }
791 });
792 let events = events_from_row(
793 "session-1",
794 42,
795 &row,
796 DateTime::parse_from_rfc3339("2026-04-28T18:47:32.280Z")?.with_timezone(&Utc),
797 )
798 .map_err(anyhow::Error::msg)?;
799
800 assert_eq!(events.len(), 1);
801 let IngestEvent::Message(Message::System {
802 id,
803 content,
804 options,
805 ..
806 }) = &events[0]
807 else {
808 panic!("unknown role must produce a System carrier");
809 };
810 assert_eq!(id, "mystery-message");
811 assert_eq!(content.as_deref().map(String::as_str), Some("mysteryRole"));
812 assert_eq!(
813 raw_record(options)
814 .and_then(|raw| raw.get("message").cloned())
815 .and_then(|message| message.get("role").cloned()),
816 Some(json!("mysteryRole")),
817 );
818 Ok(())
819 }
820
821 #[tokio::test(flavor = "multi_thread")]
822 async fn fork_parent_ids_and_compaction_summary_are_preserved() -> anyhow::Result<()> {
823 let temp = TempDir::new()?;
824 let root = temp.path().join("sessions");
825 let path = root
826 .join("project")
827 .join("2026-05-01T00-00-00-000Z_fork.jsonl");
828 write_jsonl_file(
829 &path,
830 &[
831 json!({
832 "type": "session",
833 "version": 3,
834 "id": "pi-fork-session",
835 "timestamp": "2026-05-01T00:00:00.000Z",
836 "cwd": "/tmp/pi-fork",
837 }),
838 json!({
839 "type": "message",
840 "id": "parent-message",
841 "timestamp": "2026-05-01T00:00:01.000Z",
842 "message": {
843 "role": "user",
844 "content": [{"type": "text", "text": "parent"}],
845 },
846 }),
847 json!({
848 "type": "message",
849 "id": "child-a",
850 "parentId": "parent-message",
851 "timestamp": "2026-05-01T00:00:02.000Z",
852 "message": {
853 "role": "assistant",
854 "content": [{"type": "text", "text": "branch a"}],
855 },
856 }),
857 json!({
858 "type": "message",
859 "id": "child-b",
860 "parentId": "parent-message",
861 "timestamp": "2026-05-01T00:00:03.000Z",
862 "message": {
863 "role": "assistant",
864 "content": [{"type": "text", "text": "branch b"}],
865 },
866 }),
867 json!({
868 "type": "compaction",
869 "id": "compact-1",
870 "parentId": "child-b",
871 "timestamp": "2026-05-01T00:00:04.000Z",
872 "summary": "compact summary",
873 }),
874 ],
875 )?;
876
877 let store = Store::open_local(temp.path().join("store")).await?;
878 let summary = ingest_adapter(
879 &store,
880 &PiCodingAgentAdapter::new(&root),
881 &crate::adapter::NoopOracle,
882 |_| {},
883 )
884 .await?;
885 assert_eq!(summary.dropped_events, 0);
886
887 let session = store
888 .get_session("pi-fork-session")
889 .await?
890 .expect("fixture session lands");
891 let child_a = session
892 .messages
893 .iter()
894 .find(|stored| stored.message.id() == "child-a")
895 .expect("first fork child lands");
896 let child_b = session
897 .messages
898 .iter()
899 .find(|stored| stored.message.id() == "child-b")
900 .expect("second fork child lands");
901 for child in [child_a, child_b] {
902 assert_eq!(
903 child
904 .message
905 .options()
906 .get("source")
907 .and_then(|source| source.get("parent_id"))
908 .and_then(Value::as_str),
909 Some("parent-message"),
910 );
911 }
912 assert!(source_line(child_a.message.options()) < source_line(child_b.message.options()));
913
914 let compact = session
915 .messages
916 .iter()
917 .find(|stored| stored.message.id() == "compact-1")
918 .expect("compaction carrier lands");
919 let Message::System { content, .. } = &compact.message else {
920 panic!("compaction is preserved as a System carrier");
921 };
922 assert_eq!(
923 content.as_deref().map(String::as_str),
924 Some("compact summary")
925 );
926 Ok(())
927 }
928
929 #[tokio::test(flavor = "multi_thread")]
930 async fn foreign_serialization_reparses_as_pi_coding_agent() -> anyhow::Result<()> {
931 let temp = TempDir::new()?;
932 let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
933 let origin = crate::adapter::OpencodeAdapter::new(concat!(
934 env!("CARGO_MANIFEST_DIR"),
935 "/tests/fixtures/adapter/opencode/storage"
936 ));
937 ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
938 let session_id = origin_store
939 .session_ids()
940 .await?
941 .into_iter()
942 .next()
943 .expect("opencode fixture has sessions");
944 let session = origin_store
945 .get_session(&session_id)
946 .await?
947 .expect("fixture session is readable");
948
949 let restored_root = temp.path().join("pi-corpus");
950 crate::adapter::write_restored_files(
951 &restored_root,
952 PiCodingAgentFactory.serialize(&session, RestoreFidelity::Foreign)?,
953 )?;
954 let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
955 let summary = ingest_adapter(
956 &restored_store,
957 &PiCodingAgentAdapter::new(restored_root.join("sessions")),
958 &crate::adapter::NoopOracle,
959 |_| {},
960 )
961 .await?;
962
963 assert!(summary.accepted() > 0);
964 assert_eq!(summary.dropped_events, 0);
965 Ok(())
966 }
967
968 #[tokio::test(flavor = "multi_thread")]
971 async fn tool_results_are_injected_assistant_parts_are_conversational() -> anyhow::Result<()> {
972 let temp = TempDir::new()?;
973 let store = Store::open_local(temp.path()).await?;
974 let adapter = PiCodingAgentAdapter::new(FIXTURES);
975 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
976
977 for session_id in store.session_ids().await? {
978 let session = store
979 .get_session(&session_id)
980 .await?
981 .expect("session round-trips");
982 for stored in &session.messages {
983 for part in &stored.parts {
984 match &part.kind {
985 PartKind::ToolResult { .. } => {
986 assert_eq!(part.provenance, Provenance::Injected);
987 }
988 PartKind::ToolCall { .. } | PartKind::Reasoning { .. } => {
989 assert_eq!(part.provenance, Provenance::Conversational);
990 }
991 _ => {}
992 }
993 }
994 }
995 }
996 Ok(())
997 }
998
999 fn write_jsonl_file(path: &std::path::Path, records: &[Value]) -> anyhow::Result<()> {
1000 if let Some(parent) = path.parent() {
1001 std::fs::create_dir_all(parent)?;
1002 }
1003 std::fs::write(path, jsonl_bytes(NAME, records)?)?;
1004 Ok(())
1005 }
1006}